mirror of
https://github.com/corda/corda.git
synced 2025-03-14 00:06:45 +00:00
Merge pull request #1192 from corda/mnesbit-merge-20180703
Merge up of AMQP refactor from OS
This commit is contained in:
commit
507ebf5a0c
@ -22,11 +22,11 @@ import net.corda.core.internal.div
|
||||
import net.corda.core.internal.readAll
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.crypto.X509KeyStore
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_B_NAME
|
||||
@ -35,6 +35,7 @@ import org.junit.Assert.assertArrayEquals
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.security.KeyStore
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class AMQPListenerTest {
|
||||
@ -87,16 +88,17 @@ class AMQPListenerTest {
|
||||
clientConfig.createBridgeKeyStores(DUMMY_BANK_B_NAME)
|
||||
val clientKeyStore = clientConfig.loadSslKeyStore().internal
|
||||
val clientTrustStore = clientConfig.loadTrustStore().internal
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = clientKeyStore
|
||||
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = clientTrustStore
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
override val trace: Boolean = true
|
||||
}
|
||||
// create and connect a real client
|
||||
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)),
|
||||
setOf(DUMMY_BANK_A_NAME),
|
||||
PEER_USER,
|
||||
PEER_USER,
|
||||
clientKeyStore,
|
||||
clientConfig.keyStorePassword,
|
||||
clientTrustStore,
|
||||
true,
|
||||
maxMessageSize = maxMessageSize)
|
||||
amqpConfig)
|
||||
|
||||
amqpClient.start()
|
||||
// Should see events to show we got a valid connection
|
||||
@ -158,16 +160,17 @@ class AMQPListenerTest {
|
||||
clientKeyStore.setPrivateKey("TLS_CERT", clientKeys.private, listOf(clientCert))
|
||||
val clientTrustStore = X509KeyStore("password")
|
||||
clientTrustStore.setCertificate("TLS_ROOT", clientCert)
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = clientKeyStore.internal
|
||||
override val keyStorePrivateKeyPassword: CharArray = "password".toCharArray()
|
||||
override val trustStore: KeyStore = clientTrustStore.internal
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
override val trace: Boolean = true
|
||||
}
|
||||
// create and connect a real client
|
||||
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)),
|
||||
setOf(DUMMY_BANK_A_NAME),
|
||||
PEER_USER,
|
||||
PEER_USER,
|
||||
clientKeyStore.internal,
|
||||
"password",
|
||||
clientTrustStore.internal,
|
||||
true,
|
||||
maxMessageSize = maxMessageSize)
|
||||
amqpConfig)
|
||||
amqpClient.start()
|
||||
val connectionEvent = connectionFollower.next()
|
||||
assertEquals(false, connectionEvent.connected)
|
||||
|
@ -17,9 +17,9 @@ import net.corda.bridge.services.api.ServiceStateSupport
|
||||
import net.corda.bridge.services.util.ServiceStateCombiner
|
||||
import net.corda.bridge.services.util.ServiceStateHelper
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.crypto.KEYSTORE_TYPE
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
|
||||
import org.slf4j.LoggerFactory
|
||||
@ -31,7 +31,7 @@ import java.security.KeyStore
|
||||
import java.util.*
|
||||
|
||||
class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
|
||||
val maxMessageSize: Int,
|
||||
val maximumMessageSize: Int,
|
||||
val auditService: BridgeAuditService,
|
||||
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeAMQPListenerService, ServiceStateSupport by stateHelper {
|
||||
companion object {
|
||||
@ -61,16 +61,17 @@ class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
|
||||
val keyStore = loadKeyStoreAndWipeKeys(keyStoreBytes, keyStorePassword)
|
||||
val trustStore = loadKeyStoreAndWipeKeys(trustStoreBytes, trustStorePassword)
|
||||
val bindAddress = conf.inboundConfig!!.listeningAddress
|
||||
val amqpConfiguration = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = keyStore
|
||||
override val keyStorePrivateKeyPassword: CharArray = keyStorePrivateKeyPassword
|
||||
override val trustStore: KeyStore = trustStore
|
||||
override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail
|
||||
override val maxMessageSize: Int = maximumMessageSize
|
||||
override val trace: Boolean = conf.enableAMQPPacketTrace
|
||||
}
|
||||
val server = AMQPServer(bindAddress.host,
|
||||
bindAddress.port,
|
||||
PEER_USER,
|
||||
PEER_USER,
|
||||
keyStore,
|
||||
keyStorePrivateKeyPassword,
|
||||
trustStore,
|
||||
conf.crlCheckSoftFail,
|
||||
maxMessageSize,
|
||||
conf.enableAMQPPacketTrace)
|
||||
amqpConfiguration)
|
||||
onConnectSubscription = server.onConnection.subscribe(_onConnection)
|
||||
onConnectAuditSubscription = server.onConnection.subscribe({
|
||||
if (it.connected) {
|
||||
|
@ -24,6 +24,7 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
|
||||
import rx.Subscription
|
||||
@ -33,7 +34,7 @@ import kotlin.concurrent.withLock
|
||||
|
||||
|
||||
class FloatControlListenerService(val conf: BridgeConfiguration,
|
||||
val maxMessageSize: Int,
|
||||
val maximumMessageSize: Int,
|
||||
val auditService: BridgeAuditService,
|
||||
val amqpListener: BridgeAMQPListenerService,
|
||||
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FloatControlService, ServiceStateSupport by stateHelper {
|
||||
@ -49,9 +50,6 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
|
||||
private var receiveSubscriber: Subscription? = null
|
||||
private var amqpControlServer: AMQPServer? = null
|
||||
private val sslConfiguration: BridgeSSLConfiguration
|
||||
private val keyStore: KeyStore
|
||||
private val keyStorePrivateKeyPassword: String
|
||||
private val trustStore: KeyStore
|
||||
private val floatControlAddress = conf.floatOuterConfig!!.floatAddress
|
||||
private val floatClientName = conf.floatOuterConfig!!.expectedCertificateSubject
|
||||
private var activeConnectionInfo: ConnectionChange? = null
|
||||
@ -61,9 +59,6 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
|
||||
init {
|
||||
statusFollower = ServiceStateCombiner(listOf(auditService, amqpListener))
|
||||
sslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: BridgeSSLConfigurationImpl(conf)
|
||||
keyStore = sslConfiguration.loadSslKeyStore().internal
|
||||
keyStorePrivateKeyPassword = sslConfiguration.keyStorePassword
|
||||
trustStore = sslConfiguration.loadTrustStore().internal
|
||||
}
|
||||
|
||||
|
||||
@ -83,16 +78,22 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
|
||||
|
||||
private fun startControlListener() {
|
||||
lock.withLock {
|
||||
val keyStore = sslConfiguration.loadSslKeyStore().internal
|
||||
val keyStorePrivateKeyPassword = sslConfiguration.keyStorePassword
|
||||
val trustStore = sslConfiguration.loadTrustStore().internal
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val userName: String? = null
|
||||
override val password: String? = null
|
||||
override val keyStore: KeyStore = keyStore
|
||||
override val keyStorePrivateKeyPassword: CharArray = keyStorePrivateKeyPassword.toCharArray()
|
||||
override val trustStore: KeyStore = trustStore
|
||||
override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail
|
||||
override val maxMessageSize: Int = maximumMessageSize
|
||||
override val trace: Boolean = conf.enableAMQPPacketTrace
|
||||
}
|
||||
val controlServer = AMQPServer(floatControlAddress.host,
|
||||
floatControlAddress.port,
|
||||
null,
|
||||
null,
|
||||
keyStore,
|
||||
keyStorePrivateKeyPassword,
|
||||
trustStore,
|
||||
conf.crlCheckSoftFail,
|
||||
maxMessageSize,
|
||||
conf.enableAMQPPacketTrace)
|
||||
amqpConfig)
|
||||
connectSubscriber = controlServer.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) })
|
||||
receiveSubscriber = controlServer.onReceive.subscribe({ onControlMessage(it) }, { log.error("Receive event error", it) })
|
||||
amqpControlServer = controlServer
|
||||
|
@ -27,6 +27,7 @@ import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
|
||||
import rx.Subscription
|
||||
import java.io.ByteArrayOutputStream
|
||||
@ -36,7 +37,7 @@ import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
|
||||
val maxMessageSize: Int,
|
||||
val maximumMessageSize: Int,
|
||||
val auditService: BridgeAuditService,
|
||||
haService: BridgeMasterService,
|
||||
val filterService: IncomingMessageFilterService,
|
||||
@ -52,9 +53,6 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
|
||||
private var amqpControlClient: AMQPClient? = null
|
||||
private val controlLinkSSLConfiguration: SSLConfiguration
|
||||
private val floatListenerSSLConfiguration: SSLConfiguration
|
||||
private val controlLinkKeyStore: KeyStore
|
||||
private val controLinkKeyStorePrivateKeyPassword: String
|
||||
private val controlLinkTrustStore: KeyStore
|
||||
private val expectedCertificateSubject: CordaX500Name
|
||||
private val secureRandom: SecureRandom = newSecureRandom()
|
||||
|
||||
@ -62,9 +60,6 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
|
||||
statusFollower = ServiceStateCombiner(listOf(auditService, haService, filterService))
|
||||
controlLinkSSLConfiguration = conf.bridgeInnerConfig?.customSSLConfiguration ?: conf
|
||||
floatListenerSSLConfiguration = conf.bridgeInnerConfig?.customFloatOuterSSLConfiguration ?: conf
|
||||
controlLinkKeyStore = controlLinkSSLConfiguration.loadSslKeyStore().internal
|
||||
controLinkKeyStorePrivateKeyPassword = controlLinkSSLConfiguration.keyStorePassword
|
||||
controlLinkTrustStore = controlLinkSSLConfiguration.loadTrustStore().internal
|
||||
expectedCertificateSubject = conf.bridgeInnerConfig!!.expectedCertificateSubject
|
||||
}
|
||||
|
||||
@ -73,16 +68,23 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
|
||||
statusSubscriber = statusFollower.activeChange.subscribe({
|
||||
if (it) {
|
||||
val floatAddresses = conf.bridgeInnerConfig!!.floatAddresses
|
||||
val controlLinkKeyStore = controlLinkSSLConfiguration.loadSslKeyStore().internal
|
||||
val controLinkKeyStorePrivateKeyPassword = controlLinkSSLConfiguration.keyStorePassword
|
||||
val controlLinkTrustStore = controlLinkSSLConfiguration.loadTrustStore().internal
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val userName: String? = null
|
||||
override val password: String? = null
|
||||
override val keyStore: KeyStore = controlLinkKeyStore
|
||||
override val keyStorePrivateKeyPassword: CharArray = controLinkKeyStorePrivateKeyPassword.toCharArray()
|
||||
override val trustStore: KeyStore = controlLinkTrustStore
|
||||
override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail
|
||||
override val maxMessageSize: Int = maximumMessageSize
|
||||
override val trace: Boolean = conf.enableAMQPPacketTrace
|
||||
|
||||
}
|
||||
val controlClient = AMQPClient(floatAddresses,
|
||||
setOf(expectedCertificateSubject),
|
||||
null,
|
||||
null,
|
||||
controlLinkKeyStore,
|
||||
controLinkKeyStorePrivateKeyPassword,
|
||||
controlLinkTrustStore,
|
||||
conf.crlCheckSoftFail,
|
||||
maxMessageSize,
|
||||
conf.enableAMQPPacketTrace)
|
||||
amqpConfig)
|
||||
connectSubscriber = controlClient.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) })
|
||||
receiveSubscriber = controlClient.onReceive.subscribe({ onFloatMessage(it) }, { log.error("Receive event error", it) })
|
||||
amqpControlClient = controlClient
|
||||
|
@ -21,13 +21,13 @@ import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
||||
import net.corda.nodeapi.internal.ArtemisSessionProvider
|
||||
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
@ -48,17 +48,27 @@ import kotlin.concurrent.withLock
|
||||
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConfig: SocksProxyConfig? = null,
|
||||
private val maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
|
||||
class AMQPBridgeManager(config: NodeSSLConfiguration, socksProxyConfig: SocksProxyConfig? = null,
|
||||
maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
|
||||
|
||||
private val lock = ReentrantLock()
|
||||
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
|
||||
|
||||
private class AMQPConfigurationImpl private constructor(override val keyStore: KeyStore,
|
||||
override val keyStorePrivateKeyPassword: CharArray,
|
||||
override val trustStore: KeyStore,
|
||||
override val socksProxyConfig: SocksProxyConfig?,
|
||||
override val maxMessageSize: Int) : AMQPConfiguration {
|
||||
constructor(config: NodeSSLConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int) : this(config.loadSslKeyStore().internal,
|
||||
config.keyStorePassword.toCharArray(),
|
||||
config.loadTrustStore().internal,
|
||||
socksProxyConfig,
|
||||
maxMessageSize)
|
||||
}
|
||||
|
||||
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, socksProxyConfig, maxMessageSize)
|
||||
private var sharedEventLoopGroup: EventLoopGroup? = null
|
||||
private val keyStore = config.loadSslKeyStore().internal
|
||||
private val keyStorePrivateKeyPassword: String = config.keyStorePassword
|
||||
private val trustStore = config.loadTrustStore().internal
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
private val crlCheckSoftFail: Boolean = config.crlCheckSoftFail
|
||||
|
||||
constructor(config: NodeSSLConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, socksProxyConfig: SocksProxyConfig? = null) : this(config, socksProxyConfig, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
|
||||
@ -78,27 +88,26 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
||||
private class AMQPBridge(private val queueName: String,
|
||||
private val target: NetworkHostAndPort,
|
||||
private val legalNames: Set<CordaX500Name>,
|
||||
keyStore: KeyStore,
|
||||
keyStorePrivateKeyPassword: String,
|
||||
trustStore: KeyStore,
|
||||
crlCheckSoftFail: Boolean,
|
||||
private val amqpConfig: AMQPConfiguration,
|
||||
sharedEventGroup: EventLoopGroup,
|
||||
socksProxyConfig: SocksProxyConfig?,
|
||||
private val artemis: ArtemisSessionProvider,
|
||||
private val maxMessageSize: Int) {
|
||||
private val artemis: ArtemisSessionProvider) {
|
||||
companion object {
|
||||
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private fun withMDC(block: () -> Unit) {
|
||||
MDC.put("queueName", queueName)
|
||||
MDC.put("target", target.toString())
|
||||
MDC.put("bridgeName", bridgeName)
|
||||
MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() })
|
||||
MDC.put("maxMessageSize", maxMessageSize.toString())
|
||||
block()
|
||||
MDC.clear()
|
||||
val oldMDC = MDC.getCopyOfContextMap()
|
||||
try {
|
||||
MDC.put("queueName", queueName)
|
||||
MDC.put("target", target.toString())
|
||||
MDC.put("bridgeName", bridgeName)
|
||||
MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() })
|
||||
MDC.put("maxMessageSize", amqpConfig.maxMessageSize.toString())
|
||||
block()
|
||||
} finally {
|
||||
MDC.setContextMap(oldMDC)
|
||||
}
|
||||
}
|
||||
|
||||
private fun logDebugWithMDC(msg: () -> String) {
|
||||
@ -111,8 +120,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
||||
|
||||
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
|
||||
|
||||
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail,
|
||||
sharedThreadPool = sharedEventGroup, socksProxyConfig = socksProxyConfig, maxMessageSize = maxMessageSize)
|
||||
val amqpClient = AMQPClient(listOf(target), legalNames, amqpConfig, sharedThreadPool = sharedEventGroup)
|
||||
val bridgeName: String get() = getBridgeName(queueName, target)
|
||||
private val lock = ReentrantLock() // lock to serialise session level access
|
||||
private var session: ClientSession? = null
|
||||
@ -180,8 +188,8 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
||||
}
|
||||
|
||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
if (artemisMessage.bodySize > maxMessageSize) {
|
||||
logWarnWithMDC("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " +
|
||||
if (artemisMessage.bodySize > amqpConfig.maxMessageSize) {
|
||||
logWarnWithMDC("Message exceeds maxMessageSize network parameter, maxMessageSize: [${amqpConfig.maxMessageSize}], message size: [${artemisMessage.bodySize}], " +
|
||||
"dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||
// Ack the message to prevent same message being sent to us again.
|
||||
artemisMessage.acknowledge()
|
||||
@ -229,9 +237,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
||||
if (bridgeExists(getBridgeName(queueName, target))) {
|
||||
return
|
||||
}
|
||||
|
||||
val newBridge = AMQPBridge(queueName, target, legalNames, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedEventLoopGroup!!, socksProxyConfig, artemis!!, maxMessageSize)
|
||||
|
||||
val newBridge = AMQPBridge(queueName, target, legalNames, amqpConfig, sharedEventLoopGroup!!, artemis!!)
|
||||
lock.withLock {
|
||||
bridgeNameToBridgeMap[newBridge.bridgeName] = newBridge
|
||||
}
|
||||
|
@ -57,11 +57,15 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
}
|
||||
|
||||
private fun withMDC(block: () -> Unit) {
|
||||
MDC.put("serverMode", serverMode.toString())
|
||||
MDC.put("localLegalName", localLegalName)
|
||||
MDC.put("remoteLegalName", remoteLegalName)
|
||||
block()
|
||||
MDC.clear()
|
||||
val oldMDC = MDC.getCopyOfContextMap()
|
||||
try {
|
||||
MDC.put("serverMode", serverMode.toString())
|
||||
MDC.put("localLegalName", localLegalName)
|
||||
MDC.put("remoteLegalName", remoteLegalName)
|
||||
block()
|
||||
} finally {
|
||||
MDC.setContextMap(oldMDC)
|
||||
}
|
||||
}
|
||||
|
||||
private fun logDebugWithMDC(msg: () -> String) {
|
||||
|
@ -51,11 +51,15 @@ internal class EventProcessor(channel: Channel,
|
||||
}
|
||||
|
||||
private fun withMDC(block: () -> Unit) {
|
||||
MDC.put("serverMode", serverMode.toString())
|
||||
MDC.put("localLegalName", localLegalName)
|
||||
MDC.put("remoteLegalName", remoteLegalName)
|
||||
block()
|
||||
MDC.clear()
|
||||
val oldMDC = MDC.getCopyOfContextMap()
|
||||
try {
|
||||
MDC.put("serverMode", serverMode.toString())
|
||||
MDC.put("localLegalName", localLegalName)
|
||||
MDC.put("remoteLegalName", remoteLegalName)
|
||||
block()
|
||||
} finally {
|
||||
MDC.setContextMap(oldMDC)
|
||||
}
|
||||
}
|
||||
|
||||
private fun logDebugWithMDC(msg: () -> String) {
|
||||
|
@ -35,6 +35,7 @@ import org.slf4j.MDC
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.channels.ClosedChannelException
|
||||
import java.security.cert.X509Certificate
|
||||
import javax.net.ssl.SSLException
|
||||
|
||||
/**
|
||||
* An instance of AMQPChannelHandler sits inside the netty pipeline and controls the socket level lifecycle.
|
||||
@ -61,13 +62,17 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
private var badCert: Boolean = false
|
||||
|
||||
private fun withMDC(block: () -> Unit) {
|
||||
MDC.put("serverMode", serverMode.toString())
|
||||
MDC.put("remoteAddress", remoteAddress.toString())
|
||||
MDC.put("localCert", localCert?.subjectDN?.toString())
|
||||
MDC.put("remoteCert", remoteCert?.subjectDN?.toString())
|
||||
MDC.put("allowedRemoteLegalNames", allowedRemoteLegalNames?.joinToString(separator = ";") { it.toString() })
|
||||
block()
|
||||
MDC.clear()
|
||||
val oldMDC = MDC.getCopyOfContextMap()
|
||||
try {
|
||||
MDC.put("serverMode", serverMode.toString())
|
||||
MDC.put("remoteAddress", remoteAddress.toString())
|
||||
MDC.put("localCert", localCert?.subjectDN?.toString())
|
||||
MDC.put("remoteCert", remoteCert?.subjectDN?.toString())
|
||||
MDC.put("allowedRemoteLegalNames", allowedRemoteLegalNames?.joinToString(separator = ";") { it.toString() })
|
||||
block()
|
||||
} finally {
|
||||
MDC.setContextMap(oldMDC)
|
||||
}
|
||||
}
|
||||
|
||||
private fun logDebugWithMDC(msg: () -> String) {
|
||||
@ -147,9 +152,12 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
createAMQPEngine(ctx)
|
||||
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false)))
|
||||
} else {
|
||||
val cause = evt.cause()
|
||||
// This happens when the peer node is closed during SSL establishment.
|
||||
if (evt.cause() is ClosedChannelException) {
|
||||
if (cause is ClosedChannelException) {
|
||||
logWarnWithMDC("SSL Handshake closed early.")
|
||||
} else if (cause is SSLException && cause.message == "handshake timed out") { // Sadly the exception thrown by Netty wrapper requires that we check the message.
|
||||
logWarnWithMDC("SSL Handshake timed out")
|
||||
} else {
|
||||
badCert = true
|
||||
}
|
||||
|
@ -30,9 +30,8 @@ import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImp
|
||||
import net.corda.nodeapi.internal.requireMessageSize
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.net.InetSocketAddress
|
||||
import java.lang.Long.min
|
||||
import java.security.KeyStore
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import javax.net.ssl.KeyManagerFactory
|
||||
@ -61,16 +60,8 @@ data class SocksProxyConfig(val version: SocksProxyVersion, val proxyAddress: Ne
|
||||
*/
|
||||
class AMQPClient(val targets: List<NetworkHostAndPort>,
|
||||
val allowedRemoteLegalNames: Set<CordaX500Name>,
|
||||
private val userName: String?,
|
||||
private val password: String?,
|
||||
private val keyStore: KeyStore,
|
||||
private val keyStorePrivateKeyPassword: String,
|
||||
private val trustStore: KeyStore,
|
||||
private val crlCheckSoftFail: Boolean,
|
||||
private val maxMessageSize: Int,
|
||||
private val trace: Boolean = false,
|
||||
private val sharedThreadPool: EventLoopGroup? = null,
|
||||
private val socksProxyConfig: SocksProxyConfig? = null) : AutoCloseable {
|
||||
private val configuration: AMQPConfiguration,
|
||||
private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable {
|
||||
companion object {
|
||||
init {
|
||||
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
|
||||
@ -148,18 +139,19 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
||||
private class ClientChannelInitializer(val parent: AMQPClient) : ChannelInitializer<SocketChannel>() {
|
||||
private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
|
||||
private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
|
||||
private val conf = parent.configuration
|
||||
|
||||
init {
|
||||
keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword.toCharArray())
|
||||
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(parent.trustStore, parent.crlCheckSoftFail))
|
||||
keyManagerFactory.init(conf.keyStore, conf.keyStorePrivateKeyPassword)
|
||||
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, conf.crlCheckSoftFail))
|
||||
}
|
||||
|
||||
override fun initChannel(ch: SocketChannel) {
|
||||
val pipeline = ch.pipeline()
|
||||
val socksConfig = parent.socksProxyConfig
|
||||
val socksConfig = conf.socksProxyConfig
|
||||
if (socksConfig != null) {
|
||||
val proxyAddress = InetSocketAddress(socksConfig.proxyAddress.host, socksConfig.proxyAddress.port)
|
||||
val proxy = when (parent.socksProxyConfig!!.version) {
|
||||
val proxy = when (conf.socksProxyConfig!!.version) {
|
||||
SocksProxyVersion.SOCKS4 -> {
|
||||
Socks4ProxyHandler(proxyAddress, socksConfig.userName)
|
||||
}
|
||||
@ -178,12 +170,12 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
||||
val target = parent.currentTarget
|
||||
val handler = createClientSslHelper(target, keyManagerFactory, trustManagerFactory)
|
||||
pipeline.addLast("sslHandler", handler)
|
||||
if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
|
||||
if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
|
||||
pipeline.addLast(AMQPChannelHandler(false,
|
||||
parent.allowedRemoteLegalNames,
|
||||
parent.userName,
|
||||
parent.password,
|
||||
parent.trace,
|
||||
conf.userName,
|
||||
conf.password,
|
||||
conf.trace,
|
||||
{
|
||||
parent.retryInterval = MIN_RETRY_INTERVAL // reset to fast reconnect if we connect properly
|
||||
parent._onConnection.onNext(it.second)
|
||||
@ -251,7 +243,7 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
||||
topic: String,
|
||||
destinationLegalName: String,
|
||||
properties: Map<String, Any?>): SendableMessage {
|
||||
requireMessageSize(payload.size, maxMessageSize)
|
||||
requireMessageSize(payload.size, configuration.maxMessageSize)
|
||||
return SendableMessageImpl(payload, topic, destinationLegalName, currentTarget, properties)
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,63 @@
|
||||
package net.corda.nodeapi.internal.protonwrapper.netty
|
||||
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import java.security.KeyStore
|
||||
|
||||
interface AMQPConfiguration {
|
||||
/**
|
||||
* SASL User name presented during protocol handshake. No SASL login if NULL.
|
||||
* For legacy interoperability with Artemis authorisation we typically require this to be "PEER_USER"
|
||||
*/
|
||||
@JvmDefault
|
||||
val userName: String?
|
||||
get() = ArtemisMessagingComponent.PEER_USER
|
||||
|
||||
/**
|
||||
* SASL plain text password presented during protocol handshake. No SASL login if NULL.
|
||||
* For legacy interoperability with Artemis authorisation we typically require this to be "PEER_USER"
|
||||
*/
|
||||
@JvmDefault
|
||||
val password: String?
|
||||
get() = ArtemisMessagingComponent.PEER_USER
|
||||
|
||||
/**
|
||||
* The keystore used for TLS connections
|
||||
*/
|
||||
val keyStore: KeyStore
|
||||
|
||||
/**
|
||||
* Password used to unlock TLS private keys in the KeyStore.
|
||||
*/
|
||||
val keyStorePrivateKeyPassword: CharArray
|
||||
|
||||
/**
|
||||
* The trust root KeyStore to validate the peer certificates against
|
||||
*/
|
||||
val trustStore: KeyStore
|
||||
|
||||
/**
|
||||
* Setting crlCheckSoftFail to true allows certificate paths where some leaf certificates do not contain cRLDistributionPoints
|
||||
* and also allows validation to continue if the CRL distribution server is not contactable.
|
||||
*/
|
||||
@JvmDefault
|
||||
val crlCheckSoftFail: Boolean
|
||||
get() = true
|
||||
|
||||
/**
|
||||
* Enables full debug tracing of all netty and AMQP level packets. This logs aat very high volume and is only for developers.
|
||||
*/
|
||||
@JvmDefault
|
||||
val trace: Boolean
|
||||
get() = false
|
||||
|
||||
/**
|
||||
* The maximum allowed size for packets, which will be dropped ahead of send. In future may also be enforced on receive,
|
||||
* but currently that is deferred to Artemis and the bridge code.
|
||||
*/
|
||||
val maxMessageSize: Int
|
||||
|
||||
@JvmDefault
|
||||
val socksProxyConfig: SocksProxyConfig?
|
||||
get() = null
|
||||
}
|
||||
|
@ -33,7 +33,6 @@ import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.net.BindException
|
||||
import java.net.InetSocketAddress
|
||||
import java.security.KeyStore
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import javax.net.ssl.KeyManagerFactory
|
||||
@ -46,14 +45,7 @@ import kotlin.concurrent.withLock
|
||||
*/
|
||||
class AMQPServer(val hostName: String,
|
||||
val port: Int,
|
||||
private val userName: String?,
|
||||
private val password: String?,
|
||||
private val keyStore: KeyStore,
|
||||
private val keyStorePrivateKeyPassword: CharArray,
|
||||
private val trustStore: KeyStore,
|
||||
private val crlCheckSoftFail: Boolean,
|
||||
private val maxMessageSize: Int,
|
||||
private val trace: Boolean = false) : AutoCloseable {
|
||||
private val configuration: AMQPConfiguration) : AutoCloseable {
|
||||
|
||||
companion object {
|
||||
init {
|
||||
@ -72,36 +64,26 @@ class AMQPServer(val hostName: String,
|
||||
private var serverChannel: Channel? = null
|
||||
private val clientChannels = ConcurrentHashMap<InetSocketAddress, SocketChannel>()
|
||||
|
||||
constructor(hostName: String,
|
||||
port: Int,
|
||||
userName: String?,
|
||||
password: String?,
|
||||
keyStore: KeyStore,
|
||||
keyStorePrivateKeyPassword: String,
|
||||
trustStore: KeyStore,
|
||||
crlCheckSoftFail: Boolean,
|
||||
maxMessageSize: Int,
|
||||
trace: Boolean = false) : this(hostName, port, userName, password, keyStore, keyStorePrivateKeyPassword.toCharArray(), trustStore, crlCheckSoftFail, maxMessageSize, trace)
|
||||
|
||||
private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer<SocketChannel>() {
|
||||
private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
|
||||
private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
|
||||
private val conf = parent.configuration
|
||||
|
||||
init {
|
||||
keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword)
|
||||
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(parent.trustStore, parent.crlCheckSoftFail))
|
||||
keyManagerFactory.init(conf.keyStore, conf.keyStorePrivateKeyPassword)
|
||||
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, conf.crlCheckSoftFail))
|
||||
}
|
||||
|
||||
override fun initChannel(ch: SocketChannel) {
|
||||
val pipeline = ch.pipeline()
|
||||
val handler = createServerSslHelper(keyManagerFactory, trustManagerFactory)
|
||||
pipeline.addLast("sslHandler", handler)
|
||||
if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
|
||||
if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
|
||||
pipeline.addLast(AMQPChannelHandler(true,
|
||||
null,
|
||||
parent.userName,
|
||||
parent.password,
|
||||
parent.trace,
|
||||
conf.userName,
|
||||
conf.password,
|
||||
conf.trace,
|
||||
{
|
||||
parent.clientChannels[it.first.remoteAddress()] = it.first
|
||||
parent._onConnection.onNext(it.second)
|
||||
@ -169,7 +151,7 @@ class AMQPServer(val hostName: String,
|
||||
destinationLegalName: String,
|
||||
destinationLink: NetworkHostAndPort,
|
||||
properties: Map<String, Any?>): SendableMessage {
|
||||
requireMessageSize(payload.size, maxMessageSize)
|
||||
requireMessageSize(payload.size, configuration.maxMessageSize)
|
||||
val dest = InetSocketAddress(destinationLink.host, destinationLink.port)
|
||||
require(dest in clientChannels.keys) {
|
||||
"Destination not available"
|
||||
|
@ -16,13 +16,17 @@ import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.node.services.config.EnterpriseConfiguration
|
||||
import net.corda.node.services.config.MutualExclusionConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager
|
||||
import net.corda.nodeapi.internal.bridging.BridgeManager
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
@ -35,6 +39,7 @@ import org.junit.Ignore
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.security.KeyStore
|
||||
import java.util.*
|
||||
import kotlin.system.measureNanoTime
|
||||
import kotlin.system.measureTimeMillis
|
||||
@ -280,7 +285,6 @@ class AMQPBridgeTest {
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(targetAdress).whenever(it).p2pAddress
|
||||
doReturn("").whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||
}
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
@ -302,16 +306,16 @@ class AMQPBridgeTest {
|
||||
}
|
||||
serverConfig.configureWithDevSSLCertificate()
|
||||
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = serverConfig.loadSslKeyStore().internal
|
||||
override val keyStorePrivateKeyPassword: CharArray = serverConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = serverConfig.loadTrustStore().internal
|
||||
override val trace: Boolean = true
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
}
|
||||
return AMQPServer("0.0.0.0",
|
||||
amqpPort,
|
||||
ArtemisMessagingComponent.PEER_USER,
|
||||
ArtemisMessagingComponent.PEER_USER,
|
||||
serverConfig.loadSslKeyStore().internal,
|
||||
serverConfig.keyStorePassword,
|
||||
serverConfig.loadTrustStore().internal,
|
||||
crlCheckSoftFail = true,
|
||||
trace = true,
|
||||
maxMessageSize = maxMessageSize
|
||||
amqpConfig
|
||||
)
|
||||
}
|
||||
}
|
@ -13,11 +13,11 @@ import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.crypto.*
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.internal.DEV_INTERMEDIATE_CA
|
||||
@ -46,6 +46,7 @@ import java.io.Closeable
|
||||
import java.math.BigInteger
|
||||
import java.net.InetSocketAddress
|
||||
import java.security.KeyPair
|
||||
import java.security.KeyStore
|
||||
import java.security.PrivateKey
|
||||
import java.security.Security
|
||||
import java.security.cert.X509CRL
|
||||
@ -244,7 +245,7 @@ class CertificateRevocationListNodeTests {
|
||||
@Test
|
||||
fun `AMPQ Client to Server connection succeeds when CRL cannot be obtained and soft fail is enabled`() {
|
||||
val crlCheckSoftFail = true
|
||||
val (amqpServer, serverCert) = createServer(
|
||||
val (amqpServer, _) = createServer(
|
||||
serverPort,
|
||||
crlCheckSoftFail = crlCheckSoftFail,
|
||||
nodeCrlDistPoint = "http://${server.hostAndPort}/crl/invalid.crl")
|
||||
@ -335,16 +336,18 @@ class CertificateRevocationListNodeTests {
|
||||
val nodeCert = clientConfig.recreateNodeCaAndTlsCertificates(nodeCrlDistPoint, tlsCrlDistPoint)
|
||||
val clientTruststore = clientConfig.loadTrustStore().internal
|
||||
val clientKeystore = clientConfig.loadSslKeyStore().internal
|
||||
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = clientKeystore
|
||||
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = clientTruststore
|
||||
override val crlCheckSoftFail: Boolean = crlCheckSoftFail
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
}
|
||||
return Pair(AMQPClient(
|
||||
listOf(NetworkHostAndPort("localhost", targetPort)),
|
||||
setOf(ALICE_NAME, CHARLIE_NAME),
|
||||
PEER_USER,
|
||||
PEER_USER,
|
||||
clientKeystore,
|
||||
clientConfig.keyStorePassword,
|
||||
clientTruststore,
|
||||
crlCheckSoftFail,
|
||||
maxMessageSize = maxMessageSize), nodeCert)
|
||||
amqpConfig), nodeCert)
|
||||
}
|
||||
|
||||
private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME,
|
||||
@ -363,16 +366,17 @@ class CertificateRevocationListNodeTests {
|
||||
val nodeCert = serverConfig.recreateNodeCaAndTlsCertificates(nodeCrlDistPoint, tlsCrlDistPoint)
|
||||
val serverTruststore = serverConfig.loadTrustStore().internal
|
||||
val serverKeystore = serverConfig.loadSslKeyStore().internal
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = serverKeystore
|
||||
override val keyStorePrivateKeyPassword: CharArray = serverConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = serverTruststore
|
||||
override val crlCheckSoftFail: Boolean = crlCheckSoftFail
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
}
|
||||
return Pair(AMQPServer(
|
||||
"0.0.0.0",
|
||||
port,
|
||||
PEER_USER,
|
||||
PEER_USER,
|
||||
serverKeystore,
|
||||
serverConfig.keyStorePassword,
|
||||
serverTruststore,
|
||||
crlCheckSoftFail,
|
||||
maxMessageSize = maxMessageSize), nodeCert)
|
||||
amqpConfig), nodeCert)
|
||||
}
|
||||
|
||||
private fun SSLConfiguration.recreateNodeCaAndTlsCertificates(nodeCaCrlDistPoint: String, tlsCrlDistPoint: String?): X509Certificate {
|
||||
|
@ -18,17 +18,20 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.node.services.config.EnterpriseConfiguration
|
||||
import net.corda.node.services.config.MutualExclusionConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.CIPHER_SUITES
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.createDevKeyStores
|
||||
import net.corda.nodeapi.internal.crypto.*
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.internal.createDevIntermediateCaCertPath
|
||||
@ -39,6 +42,7 @@ import org.junit.Assert.assertArrayEquals
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.security.KeyStore
|
||||
import java.security.SecureRandom
|
||||
import java.security.cert.X509Certificate
|
||||
import javax.net.ssl.*
|
||||
@ -413,18 +417,19 @@ class ProtonWrapperTests {
|
||||
|
||||
val clientTruststore = clientConfig.loadTrustStore().internal
|
||||
val clientKeystore = clientConfig.loadSslKeyStore().internal
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = clientKeystore
|
||||
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = clientTruststore
|
||||
override val trace: Boolean = true
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
}
|
||||
return AMQPClient(
|
||||
listOf(NetworkHostAndPort("localhost", serverPort),
|
||||
NetworkHostAndPort("localhost", serverPort2),
|
||||
NetworkHostAndPort("localhost", artemisPort)),
|
||||
setOf(ALICE_NAME, CHARLIE_NAME),
|
||||
PEER_USER,
|
||||
PEER_USER,
|
||||
clientKeystore,
|
||||
clientConfig.keyStorePassword,
|
||||
clientTruststore,
|
||||
true,
|
||||
maxMessageSize = maxMessageSize)
|
||||
amqpConfig)
|
||||
}
|
||||
|
||||
private fun createSharedThreadsClient(sharedEventGroup: EventLoopGroup, id: Int, maxMessageSize: Int = MAX_MESSAGE_SIZE): AMQPClient {
|
||||
@ -439,17 +444,18 @@ class ProtonWrapperTests {
|
||||
|
||||
val clientTruststore = clientConfig.loadTrustStore().internal
|
||||
val clientKeystore = clientConfig.loadSslKeyStore().internal
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = clientKeystore
|
||||
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = clientTruststore
|
||||
override val trace: Boolean = true
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
}
|
||||
return AMQPClient(
|
||||
listOf(NetworkHostAndPort("localhost", serverPort)),
|
||||
setOf(ALICE_NAME),
|
||||
PEER_USER,
|
||||
PEER_USER,
|
||||
clientKeystore,
|
||||
clientConfig.keyStorePassword,
|
||||
clientTruststore,
|
||||
true,
|
||||
sharedThreadPool = sharedEventGroup,
|
||||
maxMessageSize = maxMessageSize)
|
||||
amqpConfig,
|
||||
sharedThreadPool = sharedEventGroup)
|
||||
}
|
||||
|
||||
|
||||
@ -467,15 +473,16 @@ class ProtonWrapperTests {
|
||||
|
||||
val serverTruststore = serverConfig.loadTrustStore().internal
|
||||
val serverKeystore = serverConfig.loadSslKeyStore().internal
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = serverKeystore
|
||||
override val keyStorePrivateKeyPassword: CharArray = serverConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = serverTruststore
|
||||
override val trace: Boolean = true
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
}
|
||||
return AMQPServer(
|
||||
"0.0.0.0",
|
||||
port,
|
||||
PEER_USER,
|
||||
PEER_USER,
|
||||
serverKeystore,
|
||||
serverConfig.keyStorePassword,
|
||||
serverTruststore,
|
||||
crlCheckSoftFail = true,
|
||||
maxMessageSize = maxMessageSize)
|
||||
amqpConfig)
|
||||
}
|
||||
}
|
||||
|
@ -24,16 +24,15 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.node.services.config.EnterpriseConfiguration
|
||||
import net.corda.node.services.config.MutualExclusionConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyVersion
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.*
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
@ -43,6 +42,7 @@ import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.security.KeyStore
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class SocksTests {
|
||||
@ -281,7 +281,6 @@ class SocksTests {
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||
}
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
@ -304,20 +303,20 @@ class SocksTests {
|
||||
|
||||
val clientTruststore = clientConfig.loadTrustStore().internal
|
||||
val clientKeystore = clientConfig.loadSslKeyStore().internal
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = clientKeystore
|
||||
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = clientTruststore
|
||||
override val trace: Boolean = true
|
||||
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
|
||||
override val socksProxyConfig: SocksProxyConfig? = SocksProxyConfig(SocksProxyVersion.SOCKS5, NetworkHostAndPort("127.0.0.1", socksPort), null, null)
|
||||
}
|
||||
return AMQPClient(
|
||||
listOf(NetworkHostAndPort("localhost", serverPort),
|
||||
NetworkHostAndPort("localhost", serverPort2),
|
||||
NetworkHostAndPort("localhost", artemisPort)),
|
||||
setOf(ALICE_NAME, CHARLIE_NAME),
|
||||
PEER_USER,
|
||||
PEER_USER,
|
||||
clientKeystore,
|
||||
clientConfig.keyStorePassword,
|
||||
clientTruststore, true,
|
||||
MAX_MESSAGE_SIZE,
|
||||
socksProxyConfig = SocksProxyConfig(SocksProxyVersion.SOCKS5,
|
||||
NetworkHostAndPort("127.0.0.1", socksPort), null, null)
|
||||
)
|
||||
amqpConfig)
|
||||
}
|
||||
|
||||
private fun createSharedThreadsClient(sharedEventGroup: EventLoopGroup, id: Int): AMQPClient {
|
||||
@ -331,21 +330,20 @@ class SocksTests {
|
||||
|
||||
val clientTruststore = clientConfig.loadTrustStore().internal
|
||||
val clientKeystore = clientConfig.loadSslKeyStore().internal
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = clientKeystore
|
||||
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = clientTruststore
|
||||
override val trace: Boolean = true
|
||||
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
|
||||
override val socksProxyConfig: SocksProxyConfig? = SocksProxyConfig(SocksProxyVersion.SOCKS5, NetworkHostAndPort("127.0.0.1", socksPort), null, null)
|
||||
}
|
||||
|
||||
return AMQPClient(
|
||||
listOf(NetworkHostAndPort("localhost", serverPort)),
|
||||
setOf(ALICE_NAME),
|
||||
PEER_USER,
|
||||
PEER_USER,
|
||||
clientKeystore,
|
||||
clientConfig.keyStorePassword,
|
||||
clientTruststore,
|
||||
true,
|
||||
MAX_MESSAGE_SIZE,
|
||||
true,
|
||||
sharedEventGroup,
|
||||
socksProxyConfig = SocksProxyConfig(SocksProxyVersion.SOCKS5,
|
||||
NetworkHostAndPort("127.0.0.1", socksPort), null, null)
|
||||
)
|
||||
amqpConfig,
|
||||
sharedEventGroup)
|
||||
}
|
||||
|
||||
private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME): AMQPServer {
|
||||
@ -359,15 +357,16 @@ class SocksTests {
|
||||
|
||||
val serverTruststore = serverConfig.loadTrustStore().internal
|
||||
val serverKeystore = serverConfig.loadSslKeyStore().internal
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = serverKeystore
|
||||
override val keyStorePrivateKeyPassword: CharArray = serverConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = serverTruststore
|
||||
override val trace: Boolean = true
|
||||
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
|
||||
}
|
||||
return AMQPServer(
|
||||
"0.0.0.0",
|
||||
port,
|
||||
PEER_USER,
|
||||
PEER_USER,
|
||||
serverKeystore,
|
||||
serverConfig.keyStorePassword,
|
||||
serverTruststore,
|
||||
true,
|
||||
MAX_MESSAGE_SIZE)
|
||||
amqpConfig)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user