AMQP Bridging between nodes (#2181)

* Able to send hand coded messages to an Artemis node inbox

Get startup race condition fixed. Start cleanup work.

Fixup after rebase

Remove SASL hack for now

Minor tweaks. Enable AMQP mode manually.

Add configuration control

Slight clean up

Stop timeouts that don't work with AMQP

Rename class

Get TLS constants from :node-api

Primitive integration test

Put back commented line

Session per bridge to alow rollback on remote rejects.

Add more tests and handle multiple IP adddresses

Reduce logging

Fixup after rebase

Add a test to verify the remote end AMQP rejection logic works and does cause message replay.

Allow Artemis to duplicate after session rollback

Reduce number of threads

Move legacy bridge related code over to CoreBridgeManager

Shared threadpool for bridges

Add a test to confirm that no side effects when using a shared thread pool.

Address PR comments and remove dead lines

Rebase and add some comments

Remove a couple of blank lines

Ensure AMQP bridges are used in tests

Fixup after removal of testNodeConfiguration

Add a couple of doc comments

Add a couple of doc comments

Make things internal and use CordaFuture

Address some PR comments

Change comment type

* Use Artemis 2.2 to fix AMQP problems. Add explicit test of legacy core bridges, as marking the factory class private had silently broken them.

* Fix change due to using Artemis 2.2
This commit is contained in:
Matthew Nesbit 2017-12-15 17:48:33 +00:00 committed by GitHub
parent 02ad2b8b60
commit 595d41af04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 2386 additions and 160 deletions

View File

@ -25,8 +25,7 @@ buildscript {
* TODO Upgrade to version 2.4 for large message streaming support
*
* Due to a memory leak in the connection handling code in Artemis, we are
* temporarily downgrading to version 2.1.0 (version used prior to the 2.4
* bump).
* temporarily downgrading to version 2.2.0.
*
* The memory leak essentially triggers an out-of-memory exception within
* less than 10 seconds and can take down a node if a non-TLS connection is
@ -35,7 +34,7 @@ buildscript {
* The issue has been reported to upstream:
* https://issues.apache.org/jira/browse/ARTEMIS-1559
*/
ext.artemis_version = '2.1.0'
ext.artemis_version = '2.2.0'
ext.jackson_version = '2.9.2'
ext.jetty_version = '9.4.7.v20170914'
ext.jersey_version = '2.25'

View File

@ -9,10 +9,10 @@ import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.internal.RPCDriverDSL
import net.corda.testing.node.internal.rpcDriver
import net.corda.testing.internal.testThreadFactory
import org.apache.activemq.artemis.utils.ConcurrentHashSet
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet
import org.junit.After
import org.junit.Test
import org.junit.runner.RunWith

View File

@ -127,6 +127,9 @@ UNRELEASED
* Values for the ``database.transactionIsolationLevel`` config now follow the ``java.sql.Connection`` int constants but
without the "TRANSACTION_" prefix, i.e. "NONE", "READ_UNCOMMITTED", etc.
* Peer-to-peer communications is now via AMQP 1.0 as default.
Although the legacy Artemis CORE bridging can still be used by setting the ``useAMQPBridges`` configuration property to false.
.. _changelog_v1:
Release 1.0

View File

@ -172,4 +172,7 @@ path to the node's base directory.
:port: The port to start SSH server on
:exportJMXTo: If set to ``http``, will enable JMX metrics reporting via the Jolokia HTTP/JSON agent.
Default Jolokia access url is http://127.0.0.1:7005/jolokia/
Default Jolokia access url is http://127.0.0.1:7005/jolokia/
:useAMQPBridges: Optionally can be set to ``false`` to use Artemis CORE Bridges for peer-to-peer communications.
Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes.

View File

@ -29,12 +29,14 @@ class ArtemisTcpTransport {
// but we allow classical RSA certificates to work in case:
// a) we need to use keytool certificates in some demos,
// b) we use cloud providers or HSMs that do not support ECC.
private val CIPHER_SUITES = listOf(
val CIPHER_SUITES = listOf(
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_DHE_RSA_WITH_AES_128_GCM_SHA256"
)
val TLS_VERSIONS = listOf("TLSv1.2")
fun tcpTransport(
direction: ConnectionDirection,
hostAndPort: NetworkHostAndPort,
@ -68,7 +70,7 @@ class ArtemisTcpTransport {
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to config.trustStoreFile,
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword,
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2",
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to TLS_VERSIONS.joinToString(","),
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true,
VERIFY_PEER_LEGAL_NAME to (direction as? ConnectionDirection.Outbound)?.expectedCommonNames
)

View File

@ -0,0 +1,241 @@
package net.corda.node.amqp
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.toBase58String
import net.corda.node.internal.protonwrapper.netty.AMQPServer
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.config.*
import net.corda.node.services.messaging.ArtemisMessagingClient
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.crypto.loadKeyStore
import net.corda.testing.*
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Assert.assertArrayEquals
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import rx.Observable
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
class AMQPBridgeTest {
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
private val ALICE = TestIdentity(ALICE_NAME)
private val BOB = TestIdentity(BOB_NAME)
private val artemisPort = freePort()
private val artemisPort2 = freePort()
private val amqpPort = freePort()
private val artemisAddress = NetworkHostAndPort("localhost", artemisPort)
private val artemisAddress2 = NetworkHostAndPort("localhost", artemisPort2)
private val amqpAddress = NetworkHostAndPort("localhost", amqpPort)
private abstract class AbstractNodeConfiguration : NodeConfiguration
@Test
fun `test acked and nacked messages`() {
// Create local queue
val sourceQueueName = "internal.peers." + BOB.publicKey.toBase58String()
val (artemisServer, artemisClient) = createArtemis(sourceQueueName)
// Pre-populate local queue with 3 messages
val artemis = artemisClient.started!!
for (i in 0 until 3) {
val artemisMessage = artemis.session.createMessage(true).apply {
putIntProperty("CountProp", i)
writeBodyBufferBytes("Test$i".toByteArray())
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
artemis.producer.send(sourceQueueName, artemisMessage)
}
//Create target server
val amqpServer = createAMQPServer()
val receive = amqpServer.onReceive.toBlocking().iterator
amqpServer.start()
val received1 = receive.next()
val messageID1 = received1.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID1".toByteArray(), received1.payload)
assertEquals(0, messageID1)
received1.complete(true) // Accept first message
val received2 = receive.next()
val messageID2 = received2.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID2".toByteArray(), received2.payload)
assertEquals(1, messageID2)
received2.complete(false) // Reject message
while (true) {
val received3 = receive.next()
val messageID3 = received3.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID3".toByteArray(), received3.payload)
assertNotEquals(0, messageID3)
if (messageID3 != 1) { // keep rejecting any batched items following rejection
received3.complete(false)
} else { // beginnings of replay so accept again
received3.complete(true)
break
}
}
while (true) {
val received4 = receive.next()
val messageID4 = received4.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID4".toByteArray(), received4.payload)
if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip
assertEquals(2, messageID4) // next message should be in order though
break
}
received4.complete(true)
}
// Send a fresh item and check receive
val artemisMessage = artemis.session.createMessage(true).apply {
putIntProperty("CountProp", -1)
writeBodyBufferBytes("Test_end".toByteArray())
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
artemis.producer.send(sourceQueueName, artemisMessage)
val received5 = receive.next()
val messageID5 = received5.applicationProperties["CountProp"] as Int
assertArrayEquals("Test_end".toByteArray(), received5.payload)
assertEquals(-1, messageID5) // next message should be in order
received5.complete(true)
amqpServer.stop()
artemisClient.stop()
artemisServer.stop()
}
@Test
fun `Test legacy bridge still works`() {
// Create local queue
val sourceQueueName = "internal.peers." + ALICE.publicKey.toBase58String()
val (artemisLegacyServer, artemisLegacyClient) = createLegacyArtemis(sourceQueueName)
val (artemisServer, artemisClient) = createArtemis(null)
val artemis = artemisLegacyClient.started!!
for (i in 0 until 3) {
val artemisMessage = artemis.session.createMessage(true).apply {
putIntProperty("CountProp", i)
writeBodyBufferBytes("Test$i".toByteArray())
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
artemis.producer.send(sourceQueueName, artemisMessage)
}
val subs = artemisClient.started!!.session.createConsumer(P2P_QUEUE)
for (i in 0 until 3) {
val msg = subs.receive()
val messageBody = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
assertArrayEquals("Test$i".toByteArray(), messageBody)
assertEquals(i, msg.getIntProperty("CountProp"))
}
artemisClient.stop()
artemisServer.stop()
artemisLegacyClient.stop()
artemisLegacyServer.stop()
}
private fun createArtemis(sourceQueueName: String?): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory
doReturn(ALICE_NAME).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn("").whenever(it).exportJMXto
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(true).whenever(it).useAMQPBridges
}
artemisConfig.configureWithDevSSLCertificate()
val networkMap = rigorousMock<NetworkMapCache>().also {
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any())
}
val userService = rigorousMock<RPCSecurityManager>()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, null, networkMap, userService, MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
artemisServer.start()
artemisClient.start()
val artemis = artemisClient.started!!
if (sourceQueueName != null) {
// Local queue for outgoing messages
artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true)
}
return Pair(artemisServer, artemisClient)
}
private fun createLegacyArtemis(sourceQueueName: String): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "artemis2").whenever(it).baseDirectory
doReturn(BOB_NAME).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn("").whenever(it).exportJMXto
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(false).whenever(it).useAMQPBridges
doReturn(ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0))).whenever(it).activeMQServer
}
artemisConfig.configureWithDevSSLCertificate()
val networkMap = rigorousMock<NetworkMapCache>().also {
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any())
}
val userService = rigorousMock<RPCSecurityManager>()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, null, networkMap, userService, MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress2, MAX_MESSAGE_SIZE)
artemisServer.start()
artemisClient.start()
val artemis = artemisClient.started!!
// Local queue for outgoing messages
artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true)
return Pair(artemisServer, artemisClient)
}
private fun createAMQPServer(): AMQPServer {
val serverConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "server").whenever(it).baseDirectory
doReturn(BOB_NAME).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
}
serverConfig.configureWithDevSSLCertificate()
val serverTruststore = loadKeyStore(serverConfig.trustStoreFile, serverConfig.trustStorePassword)
val serverKeystore = loadKeyStore(serverConfig.sslKeystore, serverConfig.keyStorePassword)
val amqpServer = AMQPServer("0.0.0.0",
amqpPort,
ArtemisMessagingComponent.PEER_USER,
ArtemisMessagingComponent.PEER_USER,
serverKeystore,
serverConfig.keyStorePassword,
serverTruststore,
trace = true)
return amqpServer
}
}

View File

@ -0,0 +1,308 @@
package net.corda.node.amqp
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.div
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.toFuture
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.protonwrapper.messages.MessageStatus
import net.corda.node.internal.protonwrapper.netty.AMQPClient
import net.corda.node.internal.protonwrapper.netty.AMQPServer
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingClient
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.crypto.loadKeyStore
import net.corda.testing.*
import org.apache.activemq.artemis.api.core.RoutingType
import org.junit.Assert.assertArrayEquals
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import rx.Observable.never
import kotlin.test.assertEquals
class ProtonWrapperTests {
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
private val serverPort = freePort()
private val serverPort2 = freePort()
private val artemisPort = freePort()
private abstract class AbstractNodeConfiguration : NodeConfiguration
@Test
fun `Simple AMPQ Client to Server`() {
val amqpServer = createServer(serverPort)
amqpServer.use {
amqpServer.start()
val receiveSubs = amqpServer.onReceive.subscribe {
assertEquals(BOB_NAME.toString(), it.sourceLegalName)
assertEquals("p2p.inbound", it.topic)
assertEquals("Test", String(it.payload))
it.complete(true)
}
val amqpClient = createClient()
amqpClient.use {
val serverConnected = amqpServer.onConnection.toFuture()
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
val serverConnect = serverConnected.get()
assertEquals(true, serverConnect.connected)
assertEquals(BOB_NAME, CordaX500Name.parse(serverConnect.remoteCert!!.subject.toString()))
val clientConnect = clientConnected.get()
assertEquals(true, clientConnect.connected)
assertEquals(ALICE_NAME, CordaX500Name.parse(clientConnect.remoteCert!!.subject.toString()))
val msg = amqpClient.createMessage("Test".toByteArray(),
"p2p.inbound",
ALICE_NAME.toString(),
emptyMap())
amqpClient.write(msg)
assertEquals(MessageStatus.Acknowledged, msg.onComplete.get())
receiveSubs.unsubscribe()
}
}
}
@Test
fun `AMPQ Client refuses to connect to unexpected server`() {
val amqpServer = createServer(serverPort, CordaX500Name("Rogue 1", "London", "GB"))
amqpServer.use {
amqpServer.start()
val amqpClient = createClient()
amqpClient.use {
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
val clientConnect = clientConnected.get()
assertEquals(false, clientConnect.connected)
}
}
}
@Test
fun `Client Failover for multiple IP`() {
val amqpServer = createServer(serverPort)
val amqpServer2 = createServer(serverPort2)
val amqpClient = createClient()
try {
val serverConnected = amqpServer.onConnection.toFuture()
val serverConnected2 = amqpServer2.onConnection.toFuture()
val clientConnected = amqpClient.onConnection.toBlocking().iterator
amqpServer.start()
amqpClient.start()
val serverConn1 = serverConnected.get()
assertEquals(true, serverConn1.connected)
assertEquals(BOB_NAME, CordaX500Name.parse(serverConn1.remoteCert!!.subject.toString()))
val connState1 = clientConnected.next()
assertEquals(true, connState1.connected)
assertEquals(ALICE_NAME, CordaX500Name.parse(connState1.remoteCert!!.subject.toString()))
assertEquals(serverPort, connState1.remoteAddress.port)
// Fail over
amqpServer2.start()
amqpServer.stop()
val connState2 = clientConnected.next()
assertEquals(false, connState2.connected)
assertEquals(serverPort, connState2.remoteAddress.port)
val serverConn2 = serverConnected2.get()
assertEquals(true, serverConn2.connected)
assertEquals(BOB_NAME, CordaX500Name.parse(serverConn2.remoteCert!!.subject.toString()))
val connState3 = clientConnected.next()
assertEquals(true, connState3.connected)
assertEquals(ALICE_NAME, CordaX500Name.parse(connState3.remoteCert!!.subject.toString()))
assertEquals(serverPort2, connState3.remoteAddress.port)
// Fail back
amqpServer.start()
amqpServer2.stop()
val connState4 = clientConnected.next()
assertEquals(false, connState4.connected)
assertEquals(serverPort2, connState4.remoteAddress.port)
val serverConn3 = serverConnected.get()
assertEquals(true, serverConn3.connected)
assertEquals(BOB_NAME, CordaX500Name.parse(serverConn3.remoteCert!!.subject.toString()))
val connState5 = clientConnected.next()
assertEquals(true, connState5.connected)
assertEquals(ALICE_NAME, CordaX500Name.parse(connState5.remoteCert!!.subject.toString()))
assertEquals(serverPort, connState5.remoteAddress.port)
} finally {
amqpClient.close()
amqpServer.close()
amqpServer2.close()
}
}
@Test
fun `Send a message from AMQP to Artemis inbox`() {
val (server, artemisClient) = createArtemisServerAndClient()
val amqpClient = createClient()
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
assertEquals(true, clientConnected.get().connected)
assertEquals(CHARLIE_NAME, CordaX500Name.parse(clientConnected.get().remoteCert!!.subject.toString()))
val artemis = artemisClient.started!!
val sendAddress = "p2p.inbound"
artemis.session.createQueue(sendAddress, RoutingType.MULTICAST, "queue", true)
val consumer = artemis.session.createConsumer("queue")
val testData = "Test".toByteArray()
val testProperty = mutableMapOf<Any?, Any?>()
testProperty["TestProp"] = "1"
val message = amqpClient.createMessage(testData, sendAddress, CHARLIE_NAME.toString(), testProperty)
amqpClient.write(message)
assertEquals(MessageStatus.Acknowledged, message.onComplete.get())
val received = consumer.receive()
assertEquals("1", received.getStringProperty("TestProp"))
assertArrayEquals(testData, ByteArray(received.bodySize).apply { received.bodyBuffer.readBytes(this) })
amqpClient.stop()
artemisClient.stop()
server.stop()
}
@Test
fun `shared AMQPClient threadpool tests`() {
val amqpServer = createServer(serverPort)
amqpServer.use {
val connectionEvents = amqpServer.onConnection.toBlocking().iterator
amqpServer.start()
val sharedThreads = NioEventLoopGroup()
val amqpClient1 = createSharedThreadsClient(sharedThreads, 0)
val amqpClient2 = createSharedThreadsClient(sharedThreads, 1)
amqpClient1.start()
val connection1 = connectionEvents.next()
assertEquals(true, connection1.connected)
val connection1ID = CordaX500Name.parse(connection1.remoteCert!!.subject.toString())
assertEquals("client 0", connection1ID.organisationUnit)
val source1 = connection1.remoteAddress
amqpClient2.start()
val connection2 = connectionEvents.next()
assertEquals(true, connection2.connected)
val connection2ID = CordaX500Name.parse(connection2.remoteCert!!.subject.toString())
assertEquals("client 1", connection2ID.organisationUnit)
val source2 = connection2.remoteAddress
// Stopping one shouldn't disconnect the other
amqpClient1.stop()
val connection3 = connectionEvents.next()
assertEquals(false, connection3.connected)
assertEquals(source1, connection3.remoteAddress)
assertEquals(false, amqpClient1.connected)
assertEquals(true, amqpClient2.connected)
// Now shutdown both
amqpClient2.stop()
val connection4 = connectionEvents.next()
assertEquals(false, connection4.connected)
assertEquals(source2, connection4.remoteAddress)
assertEquals(false, amqpClient1.connected)
assertEquals(false, amqpClient2.connected)
// Now restarting one should work
amqpClient1.start()
val connection5 = connectionEvents.next()
assertEquals(true, connection5.connected)
val connection5ID = CordaX500Name.parse(connection5.remoteCert!!.subject.toString())
assertEquals("client 0", connection5ID.organisationUnit)
assertEquals(true, amqpClient1.connected)
assertEquals(false, amqpClient2.connected)
// Cleanup
amqpClient1.stop()
sharedThreads.shutdownGracefully()
sharedThreads.terminationFuture().sync()
}
}
private fun createArtemisServerAndClient(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory
doReturn(CHARLIE_NAME).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn("").whenever(it).exportJMXto
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(true).whenever(it).useAMQPBridges
}
artemisConfig.configureWithDevSSLCertificate()
val networkMap = rigorousMock<NetworkMapCache>().also {
doReturn(never<NetworkMapCache.MapChange>()).whenever(it).changed
}
val userService = rigorousMock<RPCSecurityManager>()
val server = ArtemisMessagingServer(artemisConfig, artemisPort, null, networkMap, userService, MAX_MESSAGE_SIZE)
val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE)
server.start()
client.start()
return Pair(server, client)
}
private fun createClient(): AMQPClient {
val clientConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "client").whenever(it).baseDirectory
doReturn(BOB_NAME).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
}
clientConfig.configureWithDevSSLCertificate()
val clientTruststore = loadKeyStore(clientConfig.trustStoreFile, clientConfig.trustStorePassword)
val clientKeystore = loadKeyStore(clientConfig.sslKeystore, clientConfig.keyStorePassword)
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", serverPort),
NetworkHostAndPort("localhost", serverPort2),
NetworkHostAndPort("localhost", artemisPort)),
setOf(ALICE_NAME, CHARLIE_NAME),
PEER_USER,
PEER_USER,
clientKeystore,
clientConfig.keyStorePassword,
clientTruststore, true)
return amqpClient
}
private fun createSharedThreadsClient(sharedEventGroup: EventLoopGroup, id: Int): AMQPClient {
val clientConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "client_%$id").whenever(it).baseDirectory
doReturn(CordaX500Name(null, "client $id", "Corda", "London", null, "GB")).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
}
clientConfig.configureWithDevSSLCertificate()
val clientTruststore = loadKeyStore(clientConfig.trustStoreFile, clientConfig.trustStorePassword)
val clientKeystore = loadKeyStore(clientConfig.sslKeystore, clientConfig.keyStorePassword)
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", serverPort)),
setOf(ALICE_NAME),
PEER_USER,
PEER_USER,
clientKeystore,
clientConfig.keyStorePassword,
clientTruststore, true, sharedEventGroup)
return amqpClient
}
private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME): AMQPServer {
val serverConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "server").whenever(it).baseDirectory
doReturn(name).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
}
serverConfig.configureWithDevSSLCertificate()
val serverTruststore = loadKeyStore(serverConfig.trustStoreFile, serverConfig.trustStorePassword)
val serverKeystore = loadKeyStore(serverConfig.sslKeystore, serverConfig.keyStorePassword)
val amqpServer = AMQPServer("0.0.0.0",
port,
PEER_USER,
PEER_USER,
serverKeystore,
serverConfig.keyStorePassword,
serverTruststore)
return amqpServer
}
}

View File

@ -99,7 +99,7 @@ class P2PMessagingTest {
}
// Wait until the first request is received
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
crashingNodes.firstRequestReceived.await()
// Stop alice's node after we ensured that the first request was delivered and ignored.
alice.dispose()
val numberOfRequestsReceived = crashingNodes.requestsReceived.get()
@ -109,8 +109,7 @@ class P2PMessagingTest {
// Restart the node and expect a response
val aliceRestarted = startAlice()
val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds)
val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow()
assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived)
assertThat(response).isEqualTo(responseMessage)
}

View File

@ -0,0 +1,483 @@
package net.corda.node.internal.protonwrapper.engine
import io.netty.buffer.ByteBuf
import io.netty.buffer.PooledByteBufAllocator
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
import net.corda.node.internal.protonwrapper.messages.MessageStatus
import net.corda.node.internal.protonwrapper.messages.impl.ReceivedMessageImpl
import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl
import org.apache.qpid.proton.Proton
import org.apache.qpid.proton.amqp.Binary
import org.apache.qpid.proton.amqp.Symbol
import org.apache.qpid.proton.amqp.messaging.*
import org.apache.qpid.proton.amqp.messaging.Properties
import org.apache.qpid.proton.amqp.messaging.Target
import org.apache.qpid.proton.amqp.transaction.Coordinator
import org.apache.qpid.proton.amqp.transport.ErrorCondition
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode
import org.apache.qpid.proton.amqp.transport.SenderSettleMode
import org.apache.qpid.proton.engine.*
import org.apache.qpid.proton.message.Message
import org.apache.qpid.proton.message.ProtonJMessage
import org.slf4j.LoggerFactory
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.*
/**
* This ConnectionStateMachine class handles the events generated by the proton-j library to track
* various logical connection, transport and link objects and to drive packet processing.
* It is single threaded per physical SSL connection just like the proton-j library,
* but this threading lock is managed by the EventProcessor class that calls this.
* It ultimately posts application packets to/from from the netty transport pipeline.
*/
internal class ConnectionStateMachine(serverMode: Boolean,
collector: Collector,
private val localLegalName: String,
private val remoteLegalName: String,
userName: String?,
password: String?) : BaseHandler() {
companion object {
private const val IDLE_TIMEOUT = 10000
}
val connection: Connection
private val log = LoggerFactory.getLogger(localLegalName)
private val transport: Transport
private val id = UUID.randomUUID().toString()
private var session: Session? = null
private val messageQueues = mutableMapOf<String, LinkedList<SendableMessageImpl>>()
private val unackedQueue = LinkedList<SendableMessageImpl>()
private val receivers = mutableMapOf<String, Receiver>()
private val senders = mutableMapOf<String, Sender>()
private var tagId: Int = 0
init {
connection = Engine.connection()
connection.container = "CORDA:$id"
transport = Engine.transport()
transport.idleTimeout = IDLE_TIMEOUT
transport.context = connection
transport.setEmitFlowEventOnSend(true)
connection.collect(collector)
val sasl = transport.sasl()
if (userName != null) {
//TODO This handshake is required for our queue permission logic in Artemis
sasl.setMechanisms("PLAIN")
if (serverMode) {
sasl.server()
sasl.done(Sasl.PN_SASL_OK)
} else {
sasl.plain(userName, password)
sasl.client()
}
} else {
sasl.setMechanisms("ANONYMOUS")
if (serverMode) {
sasl.server()
sasl.done(Sasl.PN_SASL_OK)
} else {
sasl.client()
}
}
transport.bind(connection)
if (!serverMode) {
connection.open()
}
}
override fun onConnectionInit(event: Event) {
val connection = event.connection
log.debug { "Connection init $connection" }
}
override fun onConnectionLocalOpen(event: Event) {
val connection = event.connection
log.info("Connection local open $connection")
val session = connection.session()
session.open()
this.session = session
for (target in messageQueues.keys) {
getSender(target)
}
}
override fun onConnectionLocalClose(event: Event) {
val connection = event.connection
log.info("Connection local close $connection")
connection.close()
connection.free()
}
override fun onConnectionUnbound(event: Event) {
if (event.connection == this.connection) {
val channel = connection.context as? Channel
if (channel != null) {
if (channel.isActive) {
channel.close()
}
}
}
}
override fun onConnectionFinal(event: Event) {
val connection = event.connection
log.debug { "Connection final $connection" }
if (connection == this.connection) {
this.connection.context = null
for (queue in messageQueues.values) {
// clear any dead messages
while (true) {
val msg = queue.poll()
if (msg != null) {
msg.doComplete(MessageStatus.Rejected)
msg.release()
} else {
break
}
}
}
messageQueues.clear()
while (true) {
val msg = unackedQueue.poll()
if (msg != null) {
msg.doComplete(MessageStatus.Rejected)
msg.release()
} else {
break
}
}
// shouldn't happen, but close socket channel now if not already done
val channel = connection.context as? Channel
if (channel != null && channel.isActive) {
channel.close()
}
// shouldn't happen, but cleanup any stranded items
transport.context = null
session = null
receivers.clear()
senders.clear()
}
}
override fun onTransportHeadClosed(event: Event) {
val transport = event.transport
log.debug { "Transport Head Closed $transport" }
transport.close_tail()
}
override fun onTransportTailClosed(event: Event) {
val transport = event.transport
log.debug { "Transport Tail Closed $transport" }
transport.close_head()
}
override fun onTransportClosed(event: Event) {
val transport = event.transport
log.debug { "Transport Closed $transport" }
if (transport == this.transport) {
transport.unbind()
transport.free()
transport.context = null
}
}
override fun onTransportError(event: Event) {
val transport = event.transport
log.info("Transport Error $transport")
val condition = event.transport.condition
if (condition != null) {
log.info("Error: ${condition.description}")
} else {
log.info("Error (no description returned).")
}
}
override fun onTransport(event: Event) {
val transport = event.transport
log.debug { "Transport $transport" }
onTransportInternal(transport)
}
private fun onTransportInternal(transport: Transport) {
if (!transport.isClosed) {
val pending = transport.pending() // Note this drives frame generation, which the susbsequent writes push to the socket
if (pending > 0) {
val connection = transport.context as? Connection
val channel = connection?.context as? Channel
channel?.writeAndFlush(transport)
}
}
}
override fun onSessionInit(event: Event) {
val session = event.session
log.debug { "Session init $session" }
}
override fun onSessionLocalOpen(event: Event) {
val session = event.session
log.debug { "Session local open $session" }
}
private fun getSender(target: String): Sender {
if (!senders.containsKey(target)) {
val sender = session!!.sender(UUID.randomUUID().toString())
sender.source = Source().apply {
address = target
dynamic = false
durable = TerminusDurability.NONE
}
sender.target = Target().apply {
address = target
dynamic = false
durable = TerminusDurability.UNSETTLED_STATE
}
sender.senderSettleMode = SenderSettleMode.UNSETTLED
sender.receiverSettleMode = ReceiverSettleMode.FIRST
senders[target] = sender
sender.open()
}
return senders[target]!!
}
override fun onSessionLocalClose(event: Event) {
val session = event.session
log.debug { "Session local close $session" }
session.close()
session.free()
}
override fun onSessionFinal(event: Event) {
val session = event.session
log.debug { "Session final $session" }
if (session == this.session) {
this.session = null
}
}
override fun onLinkLocalOpen(event: Event) {
val link = event.link
if (link is Sender) {
log.debug { "Sender Link local open ${link.name} ${link.source} ${link.target}" }
senders[link.target.address] = link
transmitMessages(link)
}
if (link is Receiver) {
log.debug { "Receiver Link local open ${link.name} ${link.source} ${link.target}" }
receivers[link.target.address] = link
}
}
override fun onLinkRemoteOpen(event: Event) {
val link = event.link
if (link is Receiver) {
if (link.remoteTarget is Coordinator) {
log.debug { "Coordinator link received" }
}
}
}
override fun onLinkFinal(event: Event) {
val link = event.link
if (link is Sender) {
log.debug { "Sender Link final ${link.name} ${link.source} ${link.target}" }
senders.remove(link.target.address)
}
if (link is Receiver) {
log.debug { "Receiver Link final ${link.name} ${link.source} ${link.target}" }
receivers.remove(link.target.address)
}
}
override fun onLinkFlow(event: Event) {
val link = event.link
if (link is Sender) {
log.debug { "Sender Flow event: ${link.name} ${link.source} ${link.target}" }
if (senders.containsKey(link.target.address)) {
transmitMessages(link)
}
} else if (link is Receiver) {
log.debug { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" }
}
}
fun processTransport() {
onTransportInternal(transport)
}
private fun transmitMessages(sender: Sender) {
val messageQueue = messageQueues.getOrPut(sender.target.address, { LinkedList() })
while (sender.credit > 0) {
log.debug { "Sender credit: ${sender.credit}" }
val nextMessage = messageQueue.poll()
if (nextMessage != null) {
try {
val messageBuf = nextMessage.buf!!
val buf = ByteBuffer.allocate(4)
buf.putInt(tagId++)
val delivery = sender.delivery(buf.array())
delivery.context = nextMessage
sender.send(messageBuf.array(), messageBuf.arrayOffset() + messageBuf.readerIndex(), messageBuf.readableBytes())
nextMessage.status = MessageStatus.Sent
log.debug { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" }
unackedQueue.offer(nextMessage)
sender.advance()
} finally {
nextMessage.release()
}
} else {
break
}
}
}
override fun onDelivery(event: Event) {
val delivery = event.delivery
log.debug { "Delivery $delivery" }
val link = delivery.link
if (link is Receiver) {
if (delivery.isReadable && !delivery.isPartial) {
val pending = delivery.pending()
val amqpMessage = decodeAMQPMessage(pending, link)
val payload = (amqpMessage.body as Data).value.array
val connection = event.connection
val channel = connection?.context as? Channel
if (channel != null) {
val appProperties = HashMap(amqpMessage.applicationProperties.value)
appProperties["_AMQ_VALIDATED_USER"] = remoteLegalName
val localAddress = channel.localAddress() as InetSocketAddress
val remoteAddress = channel.remoteAddress() as InetSocketAddress
val receivedMessage = ReceivedMessageImpl(
payload,
link.source.address,
remoteLegalName,
NetworkHostAndPort(localAddress.hostString, localAddress.port),
localLegalName,
NetworkHostAndPort(remoteAddress.hostString, remoteAddress.port),
appProperties,
channel,
delivery)
log.debug { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" }
channel.writeAndFlush(receivedMessage)
if (link.current() == delivery) {
link.advance()
}
} else {
delivery.disposition(Rejected())
delivery.settle()
}
}
} else if (link is Sender) {
log.debug { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" }
val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance()
val sourceMessage = delivery.context as? SendableMessageImpl
unackedQueue.remove(sourceMessage)
sourceMessage?.doComplete(if (ok) MessageStatus.Acknowledged else MessageStatus.Rejected)
delivery.settle()
}
}
private fun encodeAMQPMessage(message: ProtonJMessage): ByteBuf {
val buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500)
try {
try {
message.encode(NettyWritable(buffer))
val bytes = ByteArray(buffer.writerIndex())
buffer.readBytes(bytes)
return Unpooled.wrappedBuffer(bytes)
} catch (ex: Exception) {
log.error("Unable to encode message as AMQP packet", ex)
throw ex
}
} finally {
buffer.release()
}
}
private fun encodePayloadBytes(msg: SendableMessageImpl): ByteBuf {
val message = Proton.message() as ProtonJMessage
message.body = Data(Binary(msg.payload))
message.properties = Properties()
val appProperties = HashMap(msg.applicationProperties)
//TODO We shouldn't have to do this, but Artemis Server doesn't set the header on AMQP packets.
// Fortunately, when we are bridge to bridge/bridge to float we can authenticate links there.
appProperties["_AMQ_VALIDATED_USER"] = localLegalName
message.applicationProperties = ApplicationProperties(appProperties)
return encodeAMQPMessage(message)
}
private fun decodeAMQPMessage(pending: Int, link: Receiver): Message {
val msgBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(pending)
try {
link.recv(NettyWritable(msgBuf))
val amqpMessage = Proton.message()
amqpMessage.decode(msgBuf.array(), msgBuf.arrayOffset() + msgBuf.readerIndex(), msgBuf.readableBytes())
return amqpMessage
} finally {
msgBuf.release()
}
}
fun transportWriteMessage(msg: SendableMessageImpl) {
log.debug { "Queue application message write uuid: ${msg.applicationProperties["_AMQ_DUPL_ID"]} ${javax.xml.bind.DatatypeConverter.printHexBinary(msg.payload)}" }
msg.buf = encodePayloadBytes(msg)
val messageQueue = messageQueues.getOrPut(msg.topic, { LinkedList() })
messageQueue.offer(msg)
if (session != null) {
val sender = getSender(msg.topic)
transmitMessages(sender)
}
}
fun transportProcessInput(msg: ByteBuf) {
val source = msg.nioBuffer()
try {
do {
val buffer = transport.inputBuffer
val limit = Math.min(buffer.remaining(), source.remaining())
val duplicate = source.duplicate()
duplicate.limit(source.position() + limit)
buffer.put(duplicate)
transport.processInput().checkIsOk()
source.position(source.position() + limit)
} while (source.hasRemaining())
} catch (ex: Exception) {
val condition = ErrorCondition()
condition.condition = Symbol.getSymbol("proton:io")
condition.description = ex.message
transport.condition = condition
transport.close_tail()
transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
}
}
fun transportProcessOutput(ctx: ChannelHandlerContext) {
try {
var done = false
while (!done) {
val toWrite = transport.outputBuffer
if (toWrite != null && toWrite.hasRemaining()) {
val outbound = ctx.alloc().buffer(toWrite.remaining())
outbound.writeBytes(toWrite)
ctx.write(outbound)
transport.outputConsumed()
} else {
done = true
}
}
ctx.flush()
} catch (ex: Exception) {
val condition = ErrorCondition()
condition.condition = Symbol.getSymbol("proton:io")
condition.description = ex.message
transport.condition = condition
transport.close_head()
transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
}
}
}

View File

@ -0,0 +1,136 @@
package net.corda.node.internal.protonwrapper.engine
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import net.corda.core.utilities.debug
import net.corda.node.internal.protonwrapper.messages.MessageStatus
import net.corda.node.internal.protonwrapper.messages.impl.ReceivedMessageImpl
import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl
import org.apache.qpid.proton.Proton
import org.apache.qpid.proton.amqp.messaging.Accepted
import org.apache.qpid.proton.amqp.messaging.Rejected
import org.apache.qpid.proton.amqp.transport.DeliveryState
import org.apache.qpid.proton.amqp.transport.ErrorCondition
import org.apache.qpid.proton.engine.*
import org.apache.qpid.proton.engine.impl.CollectorImpl
import org.apache.qpid.proton.reactor.FlowController
import org.apache.qpid.proton.reactor.Handshaker
import org.slf4j.LoggerFactory
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
/**
* The EventProcessor class converts calls on the netty scheduler/pipeline
* into proton-j engine event calls into the ConnectionStateMachine.
* It also registers a couple of standard event processors for the basic connection handshake
* and simple sliding window flow control, so that these events don't have to live inside ConnectionStateMachine.
* Everything here is single threaded, because the proton-j library has to be run that way.
*/
internal class EventProcessor(channel: Channel,
serverMode: Boolean,
localLegalName: String,
remoteLegalName: String,
userName: String?,
password: String?) : BaseHandler() {
companion object {
private const val FLOW_WINDOW_SIZE = 10
}
private val log = LoggerFactory.getLogger(localLegalName)
private val lock = ReentrantLock()
private var pendingExecute: Boolean = false
private val executor: ScheduledExecutorService = channel.eventLoop()
private val collector = Proton.collector() as CollectorImpl
private val handlers = mutableListOf<Handler>()
private val stateMachine: ConnectionStateMachine = ConnectionStateMachine(serverMode,
collector,
localLegalName,
remoteLegalName,
userName,
password)
val connection: Connection = stateMachine.connection
init {
addHandler(Handshaker())
addHandler(FlowController(FLOW_WINDOW_SIZE))
addHandler(stateMachine)
connection.context = channel
tick(stateMachine.connection)
}
fun addHandler(handler: Handler) = handlers.add(handler)
private fun popEvent(): Event? {
var ev = collector.peek()
if (ev != null) {
ev = ev.copy() // prevent mutation by collector.pop()
collector.pop()
}
return ev
}
private fun tick(connection: Connection) {
lock.withLock {
try {
if ((connection.localState != EndpointState.CLOSED) && !connection.transport.isClosed) {
val now = System.currentTimeMillis()
val tickDelay = Math.max(0L, connection.transport.tick(now) - now)
executor.schedule({ tick(connection) }, tickDelay, TimeUnit.MILLISECONDS)
}
} catch (ex: Exception) {
connection.transport.close()
connection.condition = ErrorCondition()
}
}
}
fun processEvents() {
lock.withLock {
pendingExecute = false
log.debug { "Process Events" }
while (true) {
val ev = popEvent() ?: break
log.debug { "Process event: $ev" }
for (handler in handlers) {
handler.handle(ev)
}
}
stateMachine.processTransport()
log.debug { "Process Events Done" }
}
}
fun processEventsAsync() {
lock.withLock {
if (!pendingExecute) {
pendingExecute = true
executor.execute { processEvents() }
}
}
}
fun close() {
if (connection.localState != EndpointState.CLOSED) {
connection.close()
processEvents()
connection.free()
processEvents()
}
}
fun transportProcessInput(msg: ByteBuf) = lock.withLock { stateMachine.transportProcessInput(msg) }
fun transportProcessOutput(ctx: ChannelHandlerContext) = lock.withLock { stateMachine.transportProcessOutput(ctx) }
fun transportWriteMessage(msg: SendableMessageImpl) = lock.withLock { stateMachine.transportWriteMessage(msg) }
fun complete(completer: ReceivedMessageImpl.MessageCompleter) = lock.withLock {
val status: DeliveryState = if (completer.status == MessageStatus.Acknowledged) Accepted.getInstance() else Rejected()
completer.delivery.disposition(status)
completer.delivery.settle()
}
}

View File

@ -0,0 +1,63 @@
package net.corda.node.internal.protonwrapper.engine
import io.netty.buffer.ByteBuf
import org.apache.qpid.proton.codec.WritableBuffer
import java.nio.ByteBuffer
/**
* NettyWritable is a utility class allow proton-j encoders to write directly into a
* netty ByteBuf, without any need to materialize a ByteArray copy.
*/
internal class NettyWritable(val nettyBuffer: ByteBuf) : WritableBuffer {
override fun put(b: Byte) {
nettyBuffer.writeByte(b.toInt())
}
override fun putFloat(f: Float) {
nettyBuffer.writeFloat(f)
}
override fun putDouble(d: Double) {
nettyBuffer.writeDouble(d)
}
override fun put(src: ByteArray, offset: Int, length: Int) {
nettyBuffer.writeBytes(src, offset, length)
}
override fun putShort(s: Short) {
nettyBuffer.writeShort(s.toInt())
}
override fun putInt(i: Int) {
nettyBuffer.writeInt(i)
}
override fun putLong(l: Long) {
nettyBuffer.writeLong(l)
}
override fun hasRemaining(): Boolean {
return nettyBuffer.writerIndex() < nettyBuffer.capacity()
}
override fun remaining(): Int {
return nettyBuffer.capacity() - nettyBuffer.writerIndex()
}
override fun position(): Int {
return nettyBuffer.writerIndex()
}
override fun position(position: Int) {
nettyBuffer.writerIndex(position)
}
override fun put(payload: ByteBuffer) {
nettyBuffer.writeBytes(payload)
}
override fun limit(): Int {
return nettyBuffer.capacity()
}
}

View File

@ -0,0 +1,14 @@
package net.corda.node.internal.protonwrapper.messages
import net.corda.core.utilities.NetworkHostAndPort
/**
* Represents a common interface for both sendable and received application messages.
*/
interface ApplicationMessage {
val payload: ByteArray
val topic: String
val destinationLegalName: String
val destinationLink: NetworkHostAndPort
val applicationProperties: Map<Any?, Any?>
}

View File

@ -0,0 +1,11 @@
package net.corda.node.internal.protonwrapper.messages
/**
* The processing state of a message.
*/
enum class MessageStatus {
Unsent,
Sent,
Acknowledged,
Rejected
}

View File

@ -0,0 +1,13 @@
package net.corda.node.internal.protonwrapper.messages
import net.corda.core.utilities.NetworkHostAndPort
/**
* An extension of ApplicationMessage that includes origin information.
*/
interface ReceivedMessage : ApplicationMessage {
val sourceLegalName: String
val sourceLink: NetworkHostAndPort
fun complete(accepted: Boolean)
}

View File

@ -0,0 +1,10 @@
package net.corda.node.internal.protonwrapper.messages
import net.corda.core.concurrent.CordaFuture
/**
* An extension of ApplicationMessage to allow completion signalling.
*/
interface SendableMessage : ApplicationMessage {
val onComplete: CordaFuture<MessageStatus>
}

View File

@ -0,0 +1,30 @@
package net.corda.node.internal.protonwrapper.messages.impl
import io.netty.channel.Channel
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.protonwrapper.messages.MessageStatus
import net.corda.node.internal.protonwrapper.messages.ReceivedMessage
import org.apache.qpid.proton.engine.Delivery
/**
* An internal packet management class that allows tracking of asynchronous acknowledgements
* that in turn send Delivery messages back to the originator.
*/
internal class ReceivedMessageImpl(override val payload: ByteArray,
override val topic: String,
override val sourceLegalName: String,
override val sourceLink: NetworkHostAndPort,
override val destinationLegalName: String,
override val destinationLink: NetworkHostAndPort,
override val applicationProperties: Map<Any?, Any?>,
private val channel: Channel,
private val delivery: Delivery) : ReceivedMessage {
data class MessageCompleter(val status: MessageStatus, val delivery: Delivery)
override fun complete(accepted: Boolean) {
val status = if (accepted) MessageStatus.Acknowledged else MessageStatus.Rejected
channel.writeAndFlush(MessageCompleter(status, delivery))
}
override fun toString(): String = "Received ${String(payload)} $topic"
}

View File

@ -0,0 +1,37 @@
package net.corda.node.internal.protonwrapper.messages.impl
import io.netty.buffer.ByteBuf
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.protonwrapper.messages.MessageStatus
import net.corda.node.internal.protonwrapper.messages.SendableMessage
/**
* An internal packet management class that allows handling of the encoded buffers and
* allows registration of an acknowledgement handler when the remote receiver confirms durable storage.
*/
internal class SendableMessageImpl(override val payload: ByteArray,
override val topic: String,
override val destinationLegalName: String,
override val destinationLink: NetworkHostAndPort,
override val applicationProperties: Map<Any?, Any?>) : SendableMessage {
var buf: ByteBuf? = null
@Volatile
var status: MessageStatus = MessageStatus.Unsent
private val _onComplete = openFuture<MessageStatus>()
override val onComplete: CordaFuture<MessageStatus> get() = _onComplete
fun release() {
buf?.release()
buf = null
}
fun doComplete(status: MessageStatus) {
this.status = status
_onComplete.set(status)
}
override fun toString(): String = "Sendable ${String(payload)} $topic $status"
}

View File

@ -0,0 +1,156 @@
package net.corda.node.internal.protonwrapper.netty
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise
import io.netty.channel.socket.SocketChannel
import io.netty.handler.ssl.SslHandler
import io.netty.handler.ssl.SslHandshakeCompletionEvent
import io.netty.util.ReferenceCountUtil
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.toX509CertHolder
import net.corda.core.utilities.debug
import net.corda.node.internal.protonwrapper.engine.EventProcessor
import net.corda.node.internal.protonwrapper.messages.ReceivedMessage
import net.corda.node.internal.protonwrapper.messages.impl.ReceivedMessageImpl
import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl
import org.apache.qpid.proton.engine.ProtonJTransport
import org.apache.qpid.proton.engine.Transport
import org.apache.qpid.proton.engine.impl.ProtocolTracer
import org.apache.qpid.proton.framing.TransportFrame
import org.bouncycastle.cert.X509CertificateHolder
import org.slf4j.LoggerFactory
import java.net.InetSocketAddress
/**
* An instance of AMQPChannelHandler sits inside the netty pipeline and controls the socket level lifecycle.
* It also add some extra checks to the SSL handshake to support our non-standard certificate checks of legal identity.
* When a valid SSL connections is made then it initialises a proton-j engine instance to handle the protocol layer.
*/
internal class AMQPChannelHandler(private val serverMode: Boolean,
private val allowedRemoteLegalNames: Set<CordaX500Name>?,
private val userName: String?,
private val password: String?,
private val trace: Boolean,
private val onOpen: (Pair<SocketChannel, ConnectionChange>) -> Unit,
private val onClose: (Pair<SocketChannel, ConnectionChange>) -> Unit,
private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() {
private val log = LoggerFactory.getLogger(allowedRemoteLegalNames?.firstOrNull()?.toString() ?: "AMQPChannelHandler")
private lateinit var remoteAddress: InetSocketAddress
private lateinit var localCert: X509CertificateHolder
private lateinit var remoteCert: X509CertificateHolder
private var eventProcessor: EventProcessor? = null
override fun channelActive(ctx: ChannelHandlerContext) {
val ch = ctx.channel()
remoteAddress = ch.remoteAddress() as InetSocketAddress
val localAddress = ch.localAddress() as InetSocketAddress
log.info("New client connection ${ch.id()} from ${remoteAddress} to ${localAddress}")
}
private fun createAMQPEngine(ctx: ChannelHandlerContext) {
val ch = ctx.channel()
eventProcessor = EventProcessor(ch, serverMode, localCert.subject.toString(), remoteCert.subject.toString(), userName, password)
val connection = eventProcessor!!.connection
val transport = connection.transport as ProtonJTransport
if (trace) {
transport.protocolTracer = object : ProtocolTracer {
override fun sentFrame(transportFrame: TransportFrame) {
log.info("${transportFrame.body}")
}
override fun receivedFrame(transportFrame: TransportFrame) {
log.info("${transportFrame.body}")
}
}
}
ctx.fireChannelActive()
eventProcessor!!.processEventsAsync()
}
override fun channelInactive(ctx: ChannelHandlerContext) {
val ch = ctx.channel()
log.info("Closed client connection ${ch.id()} from ${remoteAddress} to ${ch.localAddress()}")
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, null, false)))
eventProcessor?.close()
ctx.fireChannelInactive()
}
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is SslHandshakeCompletionEvent) {
if (evt.isSuccess) {
val sslHandler = ctx.pipeline().get(SslHandler::class.java)
localCert = sslHandler.engine().session.localCertificates.first().toX509CertHolder()
remoteCert = sslHandler.engine().session.peerCertificates.first().toX509CertHolder()
try {
val remoteX500Name = CordaX500Name.parse(remoteCert.subject.toString())
require(allowedRemoteLegalNames == null || remoteX500Name in allowedRemoteLegalNames)
log.info("handshake completed subject: ${remoteX500Name}")
} catch (ex: IllegalArgumentException) {
log.error("Invalid certificate subject", ex)
ctx.close()
return
}
createAMQPEngine(ctx)
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true)))
} else {
log.error("Handshake failure $evt")
ctx.close()
}
}
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
try {
log.debug { "Received $msg" }
if (msg is ByteBuf) {
eventProcessor!!.transportProcessInput(msg)
}
} finally {
ReferenceCountUtil.release(msg)
}
eventProcessor!!.processEventsAsync()
}
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
try {
try {
log.debug { "Sent $msg" }
when (msg) {
// Transfers application packet into the AMQP engine.
is SendableMessageImpl -> {
val inetAddress = InetSocketAddress(msg.destinationLink.host, msg.destinationLink.port)
require(inetAddress == remoteAddress) {
"Message for incorrect endpoint"
}
require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.parse(remoteCert.subject.toString())) {
"Message for incorrect legal identity"
}
log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" }
eventProcessor!!.transportWriteMessage(msg)
}
// A received AMQP packet has been completed and this self-posted packet will be signalled out to the
// external application.
is ReceivedMessage -> {
onReceive(msg)
}
// A general self-posted event that triggers creation of AMQP frames when required.
is Transport -> {
eventProcessor!!.transportProcessOutput(ctx)
}
// A self-posted event that forwards status updates for delivered packets to the application.
is ReceivedMessageImpl.MessageCompleter -> {
eventProcessor!!.complete(msg)
}
}
} catch (ex: Exception) {
log.error("Error in AMQP write processing", ex)
throw ex
}
} finally {
ReferenceCountUtil.release(msg)
}
eventProcessor!!.processEventsAsync()
}
}

View File

@ -0,0 +1,194 @@
package net.corda.node.internal.protonwrapper.netty
import io.netty.bootstrap.Bootstrap
import io.netty.channel.*
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.util.internal.logging.InternalLoggerFactory
import io.netty.util.internal.logging.Slf4JLoggerFactory
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.node.internal.protonwrapper.messages.ReceivedMessage
import net.corda.node.internal.protonwrapper.messages.SendableMessage
import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl
import rx.Observable
import rx.subjects.PublishSubject
import java.security.KeyStore
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.TrustManagerFactory
import kotlin.concurrent.withLock
/**
* The AMQPClient creates a connection initiator that will try to connect in a round-robin fashion
* to the first open SSL socket. It will keep retrying until it is stopped.
* To allow thread resource control it can accept a shared thread pool as constructor input,
* otherwise it creates a self-contained Netty thraed pool and socket objects.
* Once connected it can accept application packets to send via the AMQP protocol.
*/
class AMQPClient(val targets: List<NetworkHostAndPort>,
val allowedRemoteLegalNames: Set<CordaX500Name>,
private val userName: String?,
private val password: String?,
private val keyStore: KeyStore,
private val keyStorePrivateKeyPassword: String,
private val trustStore: KeyStore,
private val trace: Boolean = false,
private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable {
companion object {
init {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
}
val log = contextLogger()
const val RETRY_INTERVAL = 1000L
const val NUM_CLIENT_THREADS = 2
}
private val lock = ReentrantLock()
@Volatile
private var stopping: Boolean = false
private var workerGroup: EventLoopGroup? = null
@Volatile
private var clientChannel: Channel? = null
// Offset into the list of targets, so that we can implement round-robin reconnect logic.
private var targetIndex = 0
private var currentTarget: NetworkHostAndPort = targets.first()
private val connectListener = object : ChannelFutureListener {
override fun operationComplete(future: ChannelFuture) {
if (!future.isSuccess) {
log.info("Failed to connect to $currentTarget")
if (!stopping) {
workerGroup?.schedule({
log.info("Retry connect to $currentTarget")
targetIndex = (targetIndex + 1).rem(targets.size)
restart()
}, RETRY_INTERVAL, TimeUnit.MILLISECONDS)
}
} else {
log.info("Connected to $currentTarget")
// Connection established successfully
clientChannel = future.channel()
clientChannel?.closeFuture()?.addListener(closeListener)
}
}
}
private val closeListener = object : ChannelFutureListener {
override fun operationComplete(future: ChannelFuture) {
log.info("Disconnected from $currentTarget")
future.channel()?.disconnect()
clientChannel = null
if (!stopping) {
workerGroup?.schedule({
log.info("Retry connect")
targetIndex = (targetIndex + 1).rem(targets.size)
restart()
}, RETRY_INTERVAL, TimeUnit.MILLISECONDS)
}
}
}
private class ClientChannelInitializer(val parent: AMQPClient) : ChannelInitializer<SocketChannel>() {
private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
init {
keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword.toCharArray())
trustManagerFactory.init(parent.trustStore)
}
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
val handler = createClientSslHelper(parent.currentTarget, keyManagerFactory, trustManagerFactory)
pipeline.addLast("sslHandler", handler)
if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
pipeline.addLast(AMQPChannelHandler(false,
parent.allowedRemoteLegalNames,
parent.userName,
parent.password,
parent.trace,
{ parent._onConnection.onNext(it.second) },
{ parent._onConnection.onNext(it.second) },
{ rcv -> parent._onReceive.onNext(rcv) }))
}
}
fun start() {
lock.withLock {
log.info("connect to: $currentTarget")
workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS)
restart()
}
}
private fun restart() {
val bootstrap = Bootstrap()
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux
bootstrap.group(workerGroup).
channel(NioSocketChannel::class.java).
handler(ClientChannelInitializer(this))
currentTarget = targets[targetIndex]
val clientFuture = bootstrap.connect(currentTarget.host, currentTarget.port)
clientFuture.addListener(connectListener)
}
fun stop() {
lock.withLock {
log.info("disconnect from: $currentTarget")
stopping = true
try {
if (sharedThreadPool == null) {
workerGroup?.shutdownGracefully()
workerGroup?.terminationFuture()?.sync()
} else {
clientChannel?.close()?.sync()
}
clientChannel = null
workerGroup = null
} finally {
stopping = false
}
log.info("stopped connection to $currentTarget")
}
}
override fun close() = stop()
val connected: Boolean
get() {
val channel = lock.withLock { clientChannel }
return channel?.isActive ?: false
}
fun createMessage(payload: ByteArray,
topic: String,
destinationLegalName: String,
properties: Map<Any?, Any?>): SendableMessage {
return SendableMessageImpl(payload, topic, destinationLegalName, currentTarget, properties)
}
fun write(msg: SendableMessage) {
val channel = clientChannel
if (channel == null) {
throw IllegalStateException("Connection to $targets not active")
} else {
channel.writeAndFlush(msg)
}
}
private val _onReceive = PublishSubject.create<ReceivedMessage>().toSerialized()
val onReceive: Observable<ReceivedMessage>
get() = _onReceive
private val _onConnection = PublishSubject.create<ConnectionChange>().toSerialized()
val onConnection: Observable<ConnectionChange>
get() = _onConnection
}

View File

@ -0,0 +1,187 @@
package net.corda.node.internal.protonwrapper.netty
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.Channel
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.util.internal.logging.InternalLoggerFactory
import io.netty.util.internal.logging.Slf4JLoggerFactory
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.node.internal.protonwrapper.messages.ReceivedMessage
import net.corda.node.internal.protonwrapper.messages.SendableMessage
import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl
import org.apache.qpid.proton.engine.Delivery
import rx.Observable
import rx.subjects.PublishSubject
import java.net.BindException
import java.net.InetSocketAddress
import java.security.KeyStore
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.TrustManagerFactory
import kotlin.concurrent.withLock
/**
* This create a socket acceptor instance that can receive possibly multiple AMQP connections.
* As of now this is not used outside of testing, but in future it will be used for standalone bridging components.
*/
class AMQPServer(val hostName: String,
val port: Int,
private val userName: String?,
private val password: String?,
private val keyStore: KeyStore,
private val keyStorePrivateKeyPassword: String,
private val trustStore: KeyStore,
private val trace: Boolean = false) : AutoCloseable {
companion object {
init {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
}
private val log = contextLogger()
const val NUM_SERVER_THREADS = 4
}
private val lock = ReentrantLock()
@Volatile
private var stopping: Boolean = false
private var bossGroup: EventLoopGroup? = null
private var workerGroup: EventLoopGroup? = null
private var serverChannel: Channel? = null
private val clientChannels = ConcurrentHashMap<InetSocketAddress, SocketChannel>()
init {
}
private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer<SocketChannel>() {
private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
init {
keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword.toCharArray())
trustManagerFactory.init(parent.trustStore)
}
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
val handler = createServerSslHelper(keyManagerFactory, trustManagerFactory)
pipeline.addLast("sslHandler", handler)
if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
pipeline.addLast(AMQPChannelHandler(true,
null,
parent.userName,
parent.password,
parent.trace,
{
parent.clientChannels.put(it.first.remoteAddress(), it.first)
parent._onConnection.onNext(it.second)
},
{
parent.clientChannels.remove(it.first.remoteAddress())
parent._onConnection.onNext(it.second)
},
{ rcv -> parent._onReceive.onNext(rcv) }))
}
}
fun start() {
lock.withLock {
stop()
bossGroup = NioEventLoopGroup(1)
workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS)
val server = ServerBootstrap()
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux
server.group(bossGroup, workerGroup).
channel(NioServerSocketChannel::class.java).
option(ChannelOption.SO_BACKLOG, 100).
handler(LoggingHandler(LogLevel.INFO)).
childHandler(ServerChannelInitializer(this))
log.info("Try to bind $port")
val channelFuture = server.bind(hostName, port).sync() // block/throw here as better to know we failed to claim port than carry on
if (!channelFuture.isDone || !channelFuture.isSuccess) {
throw BindException("Failed to bind port $port")
}
log.info("Listening on port $port")
serverChannel = channelFuture.channel()
}
}
fun stop() {
lock.withLock {
try {
stopping = true
serverChannel?.apply { close() }
serverChannel = null
workerGroup?.shutdownGracefully()
workerGroup?.terminationFuture()?.sync()
bossGroup?.shutdownGracefully()
bossGroup?.terminationFuture()?.sync()
workerGroup = null
bossGroup = null
} finally {
stopping = false
}
}
}
override fun close() = stop()
val listening: Boolean
get() {
val channel = lock.withLock { serverChannel }
return channel?.isActive ?: false
}
fun createMessage(payload: ByteArray,
topic: String,
destinationLegalName: String,
destinationLink: NetworkHostAndPort,
properties: Map<Any?, Any?>): SendableMessage {
val dest = InetSocketAddress(destinationLink.host, destinationLink.port)
require(dest in clientChannels.keys) {
"Destination not available"
}
return SendableMessageImpl(payload, topic, destinationLegalName, destinationLink, properties)
}
fun write(msg: SendableMessage) {
val dest = InetSocketAddress(msg.destinationLink.host, msg.destinationLink.port)
val channel = clientChannels[dest]
if (channel == null) {
throw IllegalStateException("Connection to ${msg.destinationLink} not active")
} else {
channel.writeAndFlush(msg)
}
}
fun complete(delivery: Delivery, target: InetSocketAddress) {
val channel = clientChannels[target]
channel?.apply {
writeAndFlush(delivery)
}
}
private val _onReceive = PublishSubject.create<ReceivedMessage>().toSerialized()
val onReceive: Observable<ReceivedMessage>
get() = _onReceive
private val _onConnection = PublishSubject.create<ConnectionChange>().toSerialized()
val onConnection: Observable<ConnectionChange>
get() = _onConnection
}

View File

@ -0,0 +1,6 @@
package net.corda.node.internal.protonwrapper.netty
import org.bouncycastle.cert.X509CertificateHolder
import java.net.InetSocketAddress
data class ConnectionChange(val remoteAddress: InetSocketAddress, val remoteCert: X509CertificateHolder?, val connected: Boolean)

View File

@ -0,0 +1,39 @@
package net.corda.node.internal.protonwrapper.netty
import io.netty.handler.ssl.SslHandler
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.ArtemisTcpTransport
import java.security.SecureRandom
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.SSLContext
import javax.net.ssl.TrustManagerFactory
internal fun createClientSslHelper(target: NetworkHostAndPort,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers
val trustManagers = trustManagerFactory.trustManagers
sslContext.init(keyManagers, trustManagers, SecureRandom())
val sslEngine = sslContext.createSSLEngine(target.host, target.port)
sslEngine.useClientMode = true
sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray()
sslEngine.enabledCipherSuites = ArtemisTcpTransport.CIPHER_SUITES.toTypedArray()
sslEngine.enableSessionCreation = true
return SslHandler(sslEngine)
}
internal fun createServerSslHelper(keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers
val trustManagers = trustManagerFactory.trustManagers
sslContext.init(keyManagers, trustManagers, SecureRandom())
val sslEngine = sslContext.createSSLEngine()
sslEngine.useClientMode = false
sslEngine.needClientAuth = true
sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray()
sslEngine.enabledCipherSuites = ArtemisTcpTransport.CIPHER_SUITES.toTypedArray()
sslEngine.enableSessionCreation = true
return SslHandler(sslEngine)
}

View File

@ -6,10 +6,10 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.services.messaging.CertificateChainCheckPolicy
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.parseAs
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import java.net.URL
import java.nio.file.Path
import java.util.*
@ -42,6 +42,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val detectPublicIp: Boolean get() = true
val sshd: SSHDConfiguration?
val database: DatabaseConfig
val useAMQPBridges: Boolean get() = true
}
data class DevModeOptions(val disableCheckpointChecker: Boolean = false)
@ -116,7 +117,8 @@ data class NodeConfigurationImpl(
// TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),
override val sshd: SSHDConfiguration? = null,
override val database: DatabaseConfig = DatabaseConfig(initialiseSchema = devMode, exportHibernateJMXStatistics = devMode)
override val database: DatabaseConfig = DatabaseConfig(initialiseSchema = devMode, exportHibernateJMXStatistics = devMode),
override val useAMQPBridges: Boolean = true
) : NodeConfiguration {
override val exportJMXto: String get() = "http"

View File

@ -0,0 +1,203 @@
package net.corda.node.services.messaging
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import net.corda.core.identity.CordaX500Name
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
import net.corda.node.internal.protonwrapper.messages.MessageStatus
import net.corda.node.internal.protonwrapper.netty.AMQPClient
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.crypto.loadKeyStore
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.slf4j.LoggerFactory
import rx.Subscription
import java.security.KeyStore
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
/**
* The AMQPBridgeManager holds the list of independent AMQPBridge objects that actively ferry messages to remote Artemis
* inboxes.
* The AMQPBridgeManager also provides a single shared connection to Artemis, although each bridge then creates an
* independent Session for message consumption.
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
*/
internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager {
private val lock = ReentrantLock()
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
private var sharedEventLoopGroup: EventLoopGroup? = null
private val keyStore = loadKeyStore(config.sslKeystore, config.keyStorePassword)
private val keyStorePrivateKeyPassword: String = config.keyStorePassword
private val trustStore = loadKeyStore(config.trustStoreFile, config.trustStorePassword)
private var artemis: ArtemisMessagingClient? = null
companion object {
private const val NUM_BRIDGE_THREADS = 0 // Default sized pool
}
/**
* Each AMQPBridge is an independent consumer of messages from the Artemis local queue per designated endpoint.
* It attempts to deliver these messages via an AMQPClient instance to the remote Artemis inbox.
* To prevent race conditions the Artemis session/consumer is only created when the AMQPClient has a stable AMQP connection.
* The acknowledgement and removal of messages from the local queue only occurs if there successful end-to-end delivery.
* If the delivery fails the session is rolled back to prevent loss of the message. This may cause duplicate delivery,
* however Artemis and the remote Corda instanced will deduplicate these messages.
*/
private class AMQPBridge(private val queueName: String,
private val target: NetworkHostAndPort,
private val legalNames: Set<CordaX500Name>,
keyStore: KeyStore,
keyStorePrivateKeyPassword: String,
trustStore: KeyStore,
sharedEventGroup: EventLoopGroup,
private val artemis: ArtemisMessagingClient) {
companion object {
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
}
private val log = LoggerFactory.getLogger("$bridgeName:${legalNames.first()}")
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, sharedThreadPool = sharedEventGroup)
val bridgeName: String get() = getBridgeName(queueName, target)
private val lock = ReentrantLock() // lock to serialise session level access
private var session: ClientSession? = null
private var consumer: ClientConsumer? = null
private var connectedSubscription: Subscription? = null
fun start() {
log.info("Create new AMQP bridge")
connectedSubscription = amqpClient.onConnection.subscribe({ x -> onSocketConnected(x.connected) })
amqpClient.start()
}
fun stop() {
log.info("Stopping AMQP bridge")
lock.withLock {
synchronized(artemis) {
consumer?.close()
consumer = null
session?.stop()
session = null
}
}
amqpClient.stop()
connectedSubscription?.unsubscribe()
connectedSubscription = null
}
private fun onSocketConnected(connected: Boolean) {
lock.withLock {
synchronized(artemis) {
if (connected) {
log.info("Bridge Connected")
val sessionFactory = artemis.started!!.sessionFactory
val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, false, false, false, DEFAULT_ACK_BATCH_SIZE)
this.session = session
val consumer = session.createConsumer(queueName)
this.consumer = consumer
consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler)
session.start()
} else {
log.info("Bridge Disconnected")
consumer?.close()
consumer = null
session?.stop()
session = null
}
}
}
}
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
lock.withLock {
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
val properties = HashMap<Any?, Any?>()
for (key in artemisMessage.propertyNames) {
var value = artemisMessage.getObjectProperty(key)
if (value is SimpleString) {
value = value.toString()
}
properties[key.toString()] = value
}
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
val sendableMessage = amqpClient.createMessage(data, P2P_QUEUE,
legalNames.first().toString(),
properties)
sendableMessage.onComplete.then {
log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" }
lock.withLock {
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
artemisMessage.acknowledge()
session?.commit()
} else {
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
session?.rollback(false)
}
}
}
amqpClient.write(sendableMessage)
}
}
}
private fun gatherAddresses(node: NodeInfo): Sequence<ArtemisMessagingComponent.ArtemisPeerAddress> {
val address = node.addresses.first()
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence()
}
override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) {
if (bridgeExists(getBridgeName(queueName, target))) {
return
}
val newBridge = AMQPBridge(queueName, target, legalNames, keyStore, keyStorePrivateKeyPassword, trustStore, sharedEventLoopGroup!!, artemis!!)
lock.withLock {
bridgeNameToBridgeMap[newBridge.bridgeName] = newBridge
}
newBridge.start()
}
override fun destroyBridges(node: NodeInfo) {
lock.withLock {
gatherAddresses(node).forEach {
val bridge = bridgeNameToBridgeMap.remove(getBridgeName(it.queueName, it.hostAndPort))
bridge?.stop()
}
}
}
override fun bridgeExists(bridgeName: String): Boolean = lock.withLock { bridgeNameToBridgeMap.containsKey(bridgeName) }
override fun start() {
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS)
val artemis = ArtemisMessagingClient(config, p2pAddress, maxMessageSize)
this.artemis = artemis
artemis.start()
}
override fun stop() = close()
override fun close() {
lock.withLock {
for (bridge in bridgeNameToBridgeMap.values) {
bridge.stop()
}
sharedEventLoopGroup?.shutdownGracefully()
sharedEventLoopGroup?.terminationFuture()?.sync()
sharedEventLoopGroup = null
bridgeNameToBridgeMap.clear()
artemis?.stop()
}
}
}

View File

@ -3,12 +3,15 @@ package net.corda.node.services.messaging
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.internal.config.SSLConfiguration
import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
class ArtemisMessagingClient(private val config: SSLConfiguration, private val serverAddress: NetworkHostAndPort, private val maxMessageSize: Int) {
companion object {
@ -46,7 +49,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration, private val s
}
fun stop() = synchronized(this) {
started!!.run {
started?.run {
producer.close()
// Ensure any trailing messages are committed to the journal
session.commit()

View File

@ -1,6 +1,5 @@
package net.corda.node.services.messaging
import io.netty.handler.ssl.SslHandler
import net.corda.core.crypto.AddressFormatException
import net.corda.core.crypto.newSecureRandom
import net.corda.core.identity.CordaX500Name
@ -24,11 +23,10 @@ import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.VERIFIER_ROLE
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_CLIENT_TLS
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_ROOT_CA
import net.corda.nodeapi.internal.crypto.loadKeyStore
import net.corda.nodeapi.*
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
@ -37,15 +35,17 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_CLIENT_TLS
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_ROOT_CA
import net.corda.nodeapi.internal.crypto.loadKeyStore
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.*
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin
@ -53,22 +53,17 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.core.settings.HierarchicalRepository
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.apache.activemq.artemis.spi.core.remoting.*
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
import org.apache.activemq.artemis.utils.ConfigurationHelper
import rx.Subscription
import java.io.IOException
import java.math.BigInteger
import java.security.KeyStore
import java.security.KeyStoreException
import java.security.Principal
import java.time.Duration
import java.util.*
import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService
import javax.annotation.concurrent.ThreadSafe
import javax.security.auth.Subject
import javax.security.auth.callback.CallbackHandler
@ -80,7 +75,6 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
import javax.security.auth.login.FailedLoginException
import javax.security.auth.login.LoginException
import javax.security.auth.spi.LoginModule
import javax.security.auth.x500.X500Principal
import javax.security.cert.CertificateException
// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman.
@ -114,6 +108,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
private lateinit var activeMQServer: ActiveMQServer
val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl
private var networkChangeHandle: Subscription? = null
private lateinit var bridgeManager: BridgeManager
init {
config.baseDirectory.requireOnDefaultFileSystem()
@ -133,6 +128,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
}
fun stop() = mutex.locked {
bridgeManager.close()
networkChangeHandle?.unsubscribe()
networkChangeHandle = null
activeMQServer.stop()
@ -153,7 +149,14 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) }
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
}
// Config driven switch between legacy CORE bridges and the newer AMQP protocol bridges.
bridgeManager = if (config.useAMQPBridges) {
AMQPBridgeManager(config, NetworkHostAndPort("localhost", p2pPort), maxMessageSize)
} else {
CoreBridgeManager(config, activeMQServer)
}
activeMQServer.start()
bridgeManager.start()
Node.printBasicNodeInfo("Listening on port", p2pPort.toString())
if (rpcPort != null) {
Node.printBasicNodeInfo("RPC service listening on port", rpcPort.toString())
@ -212,11 +215,13 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
addressFullMessagePolicy = AddressFullMessagePolicy.FAIL
}
)
// JMX enablement
if (config.exportJMXto.isNotEmpty()) {isJMXManagementEnabled = true
isJMXUseBrokerName = true}
// JMX enablement
if (config.exportJMXto.isNotEmpty()) {
isJMXManagementEnabled = true
isJMXUseBrokerName = true
}
}.configureAddressSecurity()
}.configureAddressSecurity()
private fun queueConfig(name: String, address: String = name, filter: String? = null, durable: Boolean): CoreQueueConfiguration {
@ -299,7 +304,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
fun deployBridgeToPeer(nodeInfo: NodeInfo) {
log.debug("Deploying bridge for $queueName to $nodeInfo")
val address = nodeInfo.addresses.first()
deployBridge(queueName, address, nodeInfo.legalIdentitiesAndCerts.map { it.name }.toSet())
bridgeManager.deployBridge(queueName, address, nodeInfo.legalIdentitiesAndCerts.map { it.name }.toSet())
}
if (queueName.startsWith(PEERS_PREFIX)) {
@ -334,147 +339,39 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
fun deployBridges(node: NodeInfo) {
gatherAddresses(node)
.filter { queueExists(it.queueName) && !bridgeExists(it.bridgeName) }
.filter { queueExists(it.queueName) && !bridgeManager.bridgeExists(it.bridgeName) }
.forEach { deployBridge(it, node.legalIdentitiesAndCerts.map { it.name }.toSet()) }
}
fun destroyBridges(node: NodeInfo) {
gatherAddresses(node).forEach {
activeMQServer.destroyBridge(it.bridgeName)
}
}
when (change) {
is MapChange.Added -> {
deployBridges(change.node)
}
is MapChange.Removed -> {
destroyBridges(change.node)
bridgeManager.destroyBridges(change.node)
}
is MapChange.Modified -> {
// TODO Figure out what has actually changed and only destroy those bridges that need to be.
destroyBridges(change.previousNode)
bridgeManager.destroyBridges(change.previousNode)
deployBridges(change.node)
}
}
}
private fun deployBridge(address: ArtemisPeerAddress, legalNames: Set<CordaX500Name>) {
deployBridge(address.queueName, address.hostAndPort, legalNames)
bridgeManager.deployBridge(address.queueName, address.hostAndPort, legalNames)
}
private fun createTcpTransport(connectionDirection: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true) =
ArtemisTcpTransport.tcpTransport(connectionDirection, NetworkHostAndPort(host, port), config, enableSSL = enableSSL)
/**
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
* messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
* P2P address.
*/
private fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) {
val connectionDirection = ConnectionDirection.Outbound(
connectorFactoryClassName = VerifyingNettyConnectorFactory::class.java.name,
expectedCommonNames = legalNames
)
val tcpTransport = createTcpTransport(connectionDirection, target.host, target.port)
tcpTransport.params[ArtemisMessagingServer::class.java.name] = this
// We intentionally overwrite any previous connector config in case the peer legal name changed
activeMQServer.configuration.addConnectorConfiguration(target.toString(), tcpTransport)
activeMQServer.deployBridge(BridgeConfiguration().apply {
name = getBridgeName(queueName, target)
this.queueName = queueName
forwardingAddress = P2P_QUEUE
staticConnectors = listOf(target.toString())
confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
// We keep trying until the network map deems the node unreachable and tells us it's been removed at which
// point we destroy the bridge
retryInterval = config.activeMQServer.bridge.retryIntervalMs
retryIntervalMultiplier = config.activeMQServer.bridge.retryIntervalMultiplier
maxRetryInterval = Duration.ofMinutes(config.activeMQServer.bridge.maxRetryIntervalMin).toMillis()
// As a peer of the target node we must connect to it using the peer user. Actual authentication is done using
// our TLS certificate.
user = PEER_USER
password = PEER_USER
})
}
private fun queueExists(queueName: String): Boolean = activeMQServer.queueQuery(SimpleString(queueName)).isExists
private fun bridgeExists(bridgeName: String): Boolean = activeMQServer.clusterManager.bridges.containsKey(bridgeName)
private val ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort)
private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
}
class VerifyingNettyConnectorFactory : NettyConnectorFactory() {
override fun createConnector(configuration: MutableMap<String, Any>,
handler: BufferHandler?,
listener: ClientConnectionLifeCycleListener?,
closeExecutor: Executor?,
threadPool: Executor?,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?): Connector {
return VerifyingNettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool,
protocolManager)
}
}
private class VerifyingNettyConnector(configuration: MutableMap<String, Any>,
handler: BufferHandler?,
listener: ClientConnectionLifeCycleListener?,
closeExecutor: Executor?,
threadPool: Executor?,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?) :
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) {
companion object {
private val log = contextLogger()
}
private val sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration)
override fun createConnection(): Connection? {
val connection = super.createConnection() as? NettyConnection
if (sslEnabled && connection != null) {
val expectedLegalNames: Set<CordaX500Name> = uncheckedCast(configuration[ArtemisTcpTransport.VERIFY_PEER_LEGAL_NAME] ?: emptySet<CordaX500Name>())
try {
val session = connection.channel
.pipeline()
.get(SslHandler::class.java)
.engine()
.session
// Checks the peer name is the one we are expecting.
// TODO Some problems here: after introduction of multiple legal identities on the node and removal of the main one,
// we run into the issue, who are we connecting to. There are some solutions to that: advertise `network identity`;
// have mapping port -> identity (but, design doc says about removing SingleMessageRecipient and having just NetworkHostAndPort,
// it was convenient to store that this way); SNI.
val peerLegalName = CordaX500Name.parse(session.peerPrincipal.name)
val expectedLegalName = expectedLegalNames.singleOrNull { it == peerLegalName }
require(expectedLegalName != null) {
"Peer has wrong CN - expected $expectedLegalNames but got $peerLegalName. This is either a fatal " +
"misconfiguration by the remote peer or an SSL man-in-the-middle attack!"
}
// Make sure certificate has the same name.
val peerCertificateName = CordaX500Name.build(X500Principal(session.peerCertificateChain[0].subjectDN.name))
require(peerCertificateName == expectedLegalName) {
"Peer has wrong subject name in the certificate - expected $expectedLegalNames but got $peerCertificateName. This is either a fatal " +
"misconfiguration by the remote peer or an SSL man-in-the-middle attack!"
}
X509Utilities.validateCertificateChain(session.localCertificates.last() as java.security.cert.X509Certificate, *session.peerCertificates)
} catch (e: IllegalArgumentException) {
connection.close()
log.error(e.message)
return null
}
}
return connection
}
}
sealed class CertificateChainCheckPolicy {
@FunctionalInterface

View File

@ -0,0 +1,20 @@
package net.corda.node.services.messaging
import net.corda.core.identity.CordaX500Name
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
/**
* Provides an internal interface that the [ArtemisMessagingServer] delegates to for Bridge activities.
*/
internal interface BridgeManager : AutoCloseable {
fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>)
fun destroyBridges(node: NodeInfo)
fun bridgeExists(bridgeName: String): Boolean
fun start()
fun stop()
}

View File

@ -0,0 +1,166 @@
package net.corda.node.services.messaging
import io.netty.handler.ssl.SslHandler
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.node.services.config.NodeConfiguration
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.crypto.X509Utilities
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.spi.core.remoting.*
import org.apache.activemq.artemis.utils.ConfigurationHelper
import java.time.Duration
import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService
import javax.security.auth.x500.X500Principal
/**
* This class simply moves the legacy CORE bridge code from [ArtemisMessagingServer]
* into a class implementing [BridgeManager].
* It has no lifecycle events, because the bridges are internal to the ActiveMQServer instance and thus
* stop when it is stopped.
*/
internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServer: ActiveMQServer) : BridgeManager {
companion object {
private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
private val ArtemisMessagingComponent.ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort)
}
private fun gatherAddresses(node: NodeInfo): Sequence<ArtemisMessagingComponent.ArtemisPeerAddress> {
val address = node.addresses.first()
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence()
}
/**
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
* messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
* P2P address.
*/
override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) {
val connectionDirection = ConnectionDirection.Outbound(
connectorFactoryClassName = VerifyingNettyConnectorFactory::class.java.name,
expectedCommonNames = legalNames
)
val tcpTransport = ArtemisTcpTransport.tcpTransport(connectionDirection, target, config, enableSSL = true)
tcpTransport.params[ArtemisMessagingServer::class.java.name] = this
// We intentionally overwrite any previous connector config in case the peer legal name changed
activeMQServer.configuration.addConnectorConfiguration(target.toString(), tcpTransport)
activeMQServer.deployBridge(BridgeConfiguration().apply {
name = getBridgeName(queueName, target)
this.queueName = queueName
forwardingAddress = P2P_QUEUE
staticConnectors = listOf(target.toString())
confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
// We keep trying until the network map deems the node unreachable and tells us it's been removed at which
// point we destroy the bridge
retryInterval = config.activeMQServer.bridge.retryIntervalMs
retryIntervalMultiplier = config.activeMQServer.bridge.retryIntervalMultiplier
maxRetryInterval = Duration.ofMinutes(config.activeMQServer.bridge.maxRetryIntervalMin).toMillis()
// As a peer of the target node we must connect to it using the peer user. Actual authentication is done using
// our TLS certificate.
user = PEER_USER
password = PEER_USER
})
}
override fun bridgeExists(bridgeName: String): Boolean = activeMQServer.clusterManager.bridges.containsKey(bridgeName)
override fun start() {
// Nothing to do
}
override fun stop() {
// Nothing to do
}
override fun close() = stop()
override fun destroyBridges(node: NodeInfo) {
gatherAddresses(node).forEach {
activeMQServer.destroyBridge(it.bridgeName)
}
}
}
class VerifyingNettyConnectorFactory : NettyConnectorFactory() {
override fun createConnector(configuration: MutableMap<String, Any>,
handler: BufferHandler?,
listener: ClientConnectionLifeCycleListener?,
closeExecutor: Executor?,
threadPool: Executor?,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?): Connector {
return VerifyingNettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool,
protocolManager)
}
private class VerifyingNettyConnector(configuration: MutableMap<String, Any>,
handler: BufferHandler?,
listener: ClientConnectionLifeCycleListener?,
closeExecutor: Executor?,
threadPool: Executor?,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?) :
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) {
companion object {
private val log = contextLogger()
}
private val sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration)
override fun createConnection(): Connection? {
val connection = super.createConnection() as? NettyConnection
if (sslEnabled && connection != null) {
val expectedLegalNames: Set<CordaX500Name> = uncheckedCast(configuration[ArtemisTcpTransport.VERIFY_PEER_LEGAL_NAME] ?: emptySet<CordaX500Name>())
try {
val session = connection.channel
.pipeline()
.get(SslHandler::class.java)
.engine()
.session
// Checks the peer name is the one we are expecting.
// TODO Some problems here: after introduction of multiple legal identities on the node and removal of the main one,
// we run into the issue, who are we connecting to. There are some solutions to that: advertise `network identity`;
// have mapping port -> identity (but, design doc says about removing SingleMessageRecipient and having just NetworkHostAndPort,
// it was convenient to store that this way); SNI.
val peerLegalName = CordaX500Name.parse(session.peerPrincipal.name)
val expectedLegalName = expectedLegalNames.singleOrNull { it == peerLegalName }
require(expectedLegalName != null) {
"Peer has wrong CN - expected $expectedLegalNames but got $peerLegalName. This is either a fatal " +
"misconfiguration by the remote peer or an SSL man-in-the-middle attack!"
}
// Make sure certificate has the same name.
val peerCertificateName = CordaX500Name.build(X500Principal(session.peerCertificateChain[0].subjectDN.name))
require(peerCertificateName == expectedLegalName) {
"Peer has wrong subject name in the certificate - expected $expectedLegalNames but got $peerCertificateName. This is either a fatal " +
"misconfiguration by the remote peer or an SSL man-in-the-middle attack!"
}
X509Utilities.validateCertificateChain(session.localCertificates.last() as java.security.cert.X509Certificate, *session.peerCertificates)
} catch (e: IllegalArgumentException) {
connection.close()
log.error(e.message)
return null
}
}
return connection
}
}
}

View File

@ -25,3 +25,4 @@ activeMQServer = {
maxRetryIntervalMin = 3
}
}
useAMQPBridges = true

View File

@ -5,16 +5,16 @@ import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.context.AuthServiceId
import net.corda.core.crypto.generateKeyPair
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.configureDatabase
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.internal.configureDatabase
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.*
@ -72,6 +72,7 @@ class ArtemisMessagingTests {
doReturn("").whenever(it).exportJMXto
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
doReturn(true).whenever(it).useAMQPBridges
}
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock())

View File

@ -27,8 +27,8 @@ import net.corda.core.utilities.seconds
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.StartedNode
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.*
import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.node.services.messaging.MessagingService
@ -37,14 +37,14 @@ import net.corda.node.services.transactions.BFTSMaRt
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.ServiceIdentityGenerator
import net.corda.nodeapi.internal.NotaryInfo
import net.corda.nodeapi.internal.NetworkParametersCopier
import net.corda.nodeapi.internal.NotaryInfo
import net.corda.nodeapi.internal.ServiceIdentityGenerator
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.DUMMY_NOTARY_NAME
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
@ -495,5 +495,6 @@ private fun mockNodeConfiguration(): NodeConfiguration {
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec
doReturn(null).whenever(it).devModeOptions
doReturn(true).whenever(it).useAMQPBridges
}
}

View File

@ -17,7 +17,6 @@ import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.RPCServer
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.ArtemisTcpTransport
@ -47,13 +46,12 @@ import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
import org.apache.activemq.artemis.spi.core.remoting.Connection
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3
import java.lang.reflect.Method
import java.nio.file.Path
import java.nio.file.Paths
import java.util.*
import javax.security.cert.X509Certificate
inline fun <reified I : RPCOps> RPCDriverDSL.startInVmRpcClient(
username: String = rpcTestUser.username,
@ -135,11 +133,11 @@ fun <A> rpcDriver(
private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityManager3 {
override fun validateUser(user: String?, password: String?) = isValid(user, password)
override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet<Role>?, checkType: CheckType?) = isValid(user, password)
override fun validateUser(user: String?, password: String?, certificates: Array<out X509Certificate>?): String? {
override fun validateUser(user: String?, password: String?, connection: Connection?): String? {
return validate(user, password)
}
override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet<Role>?, checkType: CheckType?, address: String?, connection: RemotingConnection?): String? {
override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet<Role>?, checkType: CheckType?, address: String?, connection: Connection?): String? {
return validate(user, password)
}