diff --git a/build.gradle b/build.gradle index 490068e188..e416f5a248 100644 --- a/build.gradle +++ b/build.gradle @@ -25,8 +25,7 @@ buildscript { * TODO Upgrade to version 2.4 for large message streaming support * * Due to a memory leak in the connection handling code in Artemis, we are - * temporarily downgrading to version 2.1.0 (version used prior to the 2.4 - * bump). + * temporarily downgrading to version 2.2.0. * * The memory leak essentially triggers an out-of-memory exception within * less than 10 seconds and can take down a node if a non-TLS connection is @@ -35,7 +34,7 @@ buildscript { * The issue has been reported to upstream: * https://issues.apache.org/jira/browse/ARTEMIS-1559 */ - ext.artemis_version = '2.1.0' + ext.artemis_version = '2.2.0' ext.jackson_version = '2.9.2' ext.jetty_version = '9.4.7.v20170914' ext.jersey_version = '2.25' diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt index b3975f3516..5ea77a66fd 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt @@ -9,10 +9,10 @@ import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.millis import net.corda.node.services.messaging.RPCServerConfiguration +import net.corda.testing.internal.testThreadFactory import net.corda.testing.node.internal.RPCDriverDSL import net.corda.testing.node.internal.rpcDriver -import net.corda.testing.internal.testThreadFactory -import org.apache.activemq.artemis.utils.ConcurrentHashSet +import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet import org.junit.After import org.junit.Test import org.junit.runner.RunWith diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 5951fdfbb3..6dfb168cc2 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -127,6 +127,9 @@ UNRELEASED * Values for the ``database.transactionIsolationLevel`` config now follow the ``java.sql.Connection`` int constants but without the "TRANSACTION_" prefix, i.e. "NONE", "READ_UNCOMMITTED", etc. +* Peer-to-peer communications is now via AMQP 1.0 as default. + Although the legacy Artemis CORE bridging can still be used by setting the ``useAMQPBridges`` configuration property to false. + .. _changelog_v1: Release 1.0 diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index 566e3d5e64..49572d1c8e 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -172,4 +172,7 @@ path to the node's base directory. :port: The port to start SSH server on :exportJMXTo: If set to ``http``, will enable JMX metrics reporting via the Jolokia HTTP/JSON agent. - Default Jolokia access url is http://127.0.0.1:7005/jolokia/ \ No newline at end of file + Default Jolokia access url is http://127.0.0.1:7005/jolokia/ + +:useAMQPBridges: Optionally can be set to ``false`` to use Artemis CORE Bridges for peer-to-peer communications. + Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes. \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt b/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt index ab1720934a..9811800de0 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt @@ -29,12 +29,14 @@ class ArtemisTcpTransport { // but we allow classical RSA certificates to work in case: // a) we need to use keytool certificates in some demos, // b) we use cloud providers or HSMs that do not support ECC. - private val CIPHER_SUITES = listOf( + val CIPHER_SUITES = listOf( "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_DHE_RSA_WITH_AES_128_GCM_SHA256" ) + val TLS_VERSIONS = listOf("TLSv1.2") + fun tcpTransport( direction: ConnectionDirection, hostAndPort: NetworkHostAndPort, @@ -68,7 +70,7 @@ class ArtemisTcpTransport { TransportConstants.TRUSTSTORE_PATH_PROP_NAME to config.trustStoreFile, TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword, TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","), - TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2", + TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to TLS_VERSIONS.joinToString(","), TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true, VERIFY_PEER_LEGAL_NAME to (direction as? ConnectionDirection.Outbound)?.expectedCommonNames ) diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt new file mode 100644 index 0000000000..1e094f4194 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -0,0 +1,241 @@ +package net.corda.node.amqp + +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.internal.div +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.NetworkMapCache +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.toBase58String +import net.corda.node.internal.protonwrapper.netty.AMQPServer +import net.corda.node.internal.security.RPCSecurityManager +import net.corda.node.services.config.* +import net.corda.node.services.messaging.ArtemisMessagingClient +import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.nodeapi.internal.ArtemisMessagingComponent +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE +import net.corda.nodeapi.internal.crypto.loadKeyStore +import net.corda.testing.* +import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.junit.Assert.assertArrayEquals +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import rx.Observable +import java.util.* +import kotlin.test.assertEquals +import kotlin.test.assertNotEquals + +class AMQPBridgeTest { + @Rule + @JvmField + val temporaryFolder = TemporaryFolder() + + private val ALICE = TestIdentity(ALICE_NAME) + private val BOB = TestIdentity(BOB_NAME) + + private val artemisPort = freePort() + private val artemisPort2 = freePort() + private val amqpPort = freePort() + private val artemisAddress = NetworkHostAndPort("localhost", artemisPort) + private val artemisAddress2 = NetworkHostAndPort("localhost", artemisPort2) + private val amqpAddress = NetworkHostAndPort("localhost", amqpPort) + + private abstract class AbstractNodeConfiguration : NodeConfiguration + + @Test + fun `test acked and nacked messages`() { + // Create local queue + val sourceQueueName = "internal.peers." + BOB.publicKey.toBase58String() + val (artemisServer, artemisClient) = createArtemis(sourceQueueName) + + // Pre-populate local queue with 3 messages + val artemis = artemisClient.started!! + for (i in 0 until 3) { + val artemisMessage = artemis.session.createMessage(true).apply { + putIntProperty("CountProp", i) + writeBodyBufferBytes("Test$i".toByteArray()) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + } + artemis.producer.send(sourceQueueName, artemisMessage) + } + + //Create target server + val amqpServer = createAMQPServer() + + val receive = amqpServer.onReceive.toBlocking().iterator + amqpServer.start() + + val received1 = receive.next() + val messageID1 = received1.applicationProperties["CountProp"] as Int + assertArrayEquals("Test$messageID1".toByteArray(), received1.payload) + assertEquals(0, messageID1) + received1.complete(true) // Accept first message + + val received2 = receive.next() + val messageID2 = received2.applicationProperties["CountProp"] as Int + assertArrayEquals("Test$messageID2".toByteArray(), received2.payload) + assertEquals(1, messageID2) + received2.complete(false) // Reject message + + while (true) { + val received3 = receive.next() + val messageID3 = received3.applicationProperties["CountProp"] as Int + assertArrayEquals("Test$messageID3".toByteArray(), received3.payload) + assertNotEquals(0, messageID3) + if (messageID3 != 1) { // keep rejecting any batched items following rejection + received3.complete(false) + } else { // beginnings of replay so accept again + received3.complete(true) + break + } + } + + while (true) { + val received4 = receive.next() + val messageID4 = received4.applicationProperties["CountProp"] as Int + assertArrayEquals("Test$messageID4".toByteArray(), received4.payload) + if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip + assertEquals(2, messageID4) // next message should be in order though + break + } + received4.complete(true) + } + + // Send a fresh item and check receive + val artemisMessage = artemis.session.createMessage(true).apply { + putIntProperty("CountProp", -1) + writeBodyBufferBytes("Test_end".toByteArray()) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + } + artemis.producer.send(sourceQueueName, artemisMessage) + + val received5 = receive.next() + val messageID5 = received5.applicationProperties["CountProp"] as Int + assertArrayEquals("Test_end".toByteArray(), received5.payload) + assertEquals(-1, messageID5) // next message should be in order + received5.complete(true) + + amqpServer.stop() + artemisClient.stop() + artemisServer.stop() + } + + @Test + fun `Test legacy bridge still works`() { + // Create local queue + val sourceQueueName = "internal.peers." + ALICE.publicKey.toBase58String() + val (artemisLegacyServer, artemisLegacyClient) = createLegacyArtemis(sourceQueueName) + + + val (artemisServer, artemisClient) = createArtemis(null) + + val artemis = artemisLegacyClient.started!! + for (i in 0 until 3) { + val artemisMessage = artemis.session.createMessage(true).apply { + putIntProperty("CountProp", i) + writeBodyBufferBytes("Test$i".toByteArray()) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + } + artemis.producer.send(sourceQueueName, artemisMessage) + } + + + val subs = artemisClient.started!!.session.createConsumer(P2P_QUEUE) + for (i in 0 until 3) { + val msg = subs.receive() + val messageBody = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) } + assertArrayEquals("Test$i".toByteArray(), messageBody) + assertEquals(i, msg.getIntProperty("CountProp")) + } + + artemisClient.stop() + artemisServer.stop() + artemisLegacyClient.stop() + artemisLegacyServer.stop() + + } + + private fun createArtemis(sourceQueueName: String?): Pair { + val artemisConfig = rigorousMock().also { + doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory + doReturn(ALICE_NAME).whenever(it).myLegalName + doReturn("trustpass").whenever(it).trustStorePassword + doReturn("cordacadevpass").whenever(it).keyStorePassword + doReturn("").whenever(it).exportJMXto + doReturn(emptyList()).whenever(it).certificateChainCheckPolicies + doReturn(true).whenever(it).useAMQPBridges + } + artemisConfig.configureWithDevSSLCertificate() + val networkMap = rigorousMock().also { + doReturn(Observable.never()).whenever(it).changed + doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any()) + } + val userService = rigorousMock() + val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, null, networkMap, userService, MAX_MESSAGE_SIZE) + val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE) + artemisServer.start() + artemisClient.start() + val artemis = artemisClient.started!! + if (sourceQueueName != null) { + // Local queue for outgoing messages + artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true) + } + return Pair(artemisServer, artemisClient) + } + + private fun createLegacyArtemis(sourceQueueName: String): Pair { + val artemisConfig = rigorousMock().also { + doReturn(temporaryFolder.root.toPath() / "artemis2").whenever(it).baseDirectory + doReturn(BOB_NAME).whenever(it).myLegalName + doReturn("trustpass").whenever(it).trustStorePassword + doReturn("cordacadevpass").whenever(it).keyStorePassword + doReturn("").whenever(it).exportJMXto + doReturn(emptyList()).whenever(it).certificateChainCheckPolicies + doReturn(false).whenever(it).useAMQPBridges + doReturn(ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0))).whenever(it).activeMQServer + } + artemisConfig.configureWithDevSSLCertificate() + val networkMap = rigorousMock().also { + doReturn(Observable.never()).whenever(it).changed + doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any()) + } + val userService = rigorousMock() + val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, null, networkMap, userService, MAX_MESSAGE_SIZE) + val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress2, MAX_MESSAGE_SIZE) + artemisServer.start() + artemisClient.start() + val artemis = artemisClient.started!! + // Local queue for outgoing messages + artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true) + return Pair(artemisServer, artemisClient) + } + + private fun createAMQPServer(): AMQPServer { + val serverConfig = rigorousMock().also { + doReturn(temporaryFolder.root.toPath() / "server").whenever(it).baseDirectory + doReturn(BOB_NAME).whenever(it).myLegalName + doReturn("trustpass").whenever(it).trustStorePassword + doReturn("cordacadevpass").whenever(it).keyStorePassword + } + serverConfig.configureWithDevSSLCertificate() + + val serverTruststore = loadKeyStore(serverConfig.trustStoreFile, serverConfig.trustStorePassword) + val serverKeystore = loadKeyStore(serverConfig.sslKeystore, serverConfig.keyStorePassword) + val amqpServer = AMQPServer("0.0.0.0", + amqpPort, + ArtemisMessagingComponent.PEER_USER, + ArtemisMessagingComponent.PEER_USER, + serverKeystore, + serverConfig.keyStorePassword, + serverTruststore, + trace = true) + return amqpServer + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt new file mode 100644 index 0000000000..3f6c8444f7 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -0,0 +1,308 @@ +package net.corda.node.amqp + +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.whenever +import io.netty.channel.EventLoopGroup +import io.netty.channel.nio.NioEventLoopGroup +import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.div +import net.corda.core.node.services.NetworkMapCache +import net.corda.core.toFuture +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.internal.protonwrapper.messages.MessageStatus +import net.corda.node.internal.protonwrapper.netty.AMQPClient +import net.corda.node.internal.protonwrapper.netty.AMQPServer +import net.corda.node.internal.security.RPCSecurityManager +import net.corda.node.services.config.CertChainPolicyConfig +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.node.services.messaging.ArtemisMessagingClient +import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER +import net.corda.nodeapi.internal.crypto.loadKeyStore +import net.corda.testing.* +import org.apache.activemq.artemis.api.core.RoutingType +import org.junit.Assert.assertArrayEquals +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import rx.Observable.never +import kotlin.test.assertEquals + +class ProtonWrapperTests { + @Rule + @JvmField + val temporaryFolder = TemporaryFolder() + + private val serverPort = freePort() + private val serverPort2 = freePort() + private val artemisPort = freePort() + + private abstract class AbstractNodeConfiguration : NodeConfiguration + + @Test + fun `Simple AMPQ Client to Server`() { + val amqpServer = createServer(serverPort) + amqpServer.use { + amqpServer.start() + val receiveSubs = amqpServer.onReceive.subscribe { + assertEquals(BOB_NAME.toString(), it.sourceLegalName) + assertEquals("p2p.inbound", it.topic) + assertEquals("Test", String(it.payload)) + it.complete(true) + } + val amqpClient = createClient() + amqpClient.use { + val serverConnected = amqpServer.onConnection.toFuture() + val clientConnected = amqpClient.onConnection.toFuture() + amqpClient.start() + val serverConnect = serverConnected.get() + assertEquals(true, serverConnect.connected) + assertEquals(BOB_NAME, CordaX500Name.parse(serverConnect.remoteCert!!.subject.toString())) + val clientConnect = clientConnected.get() + assertEquals(true, clientConnect.connected) + assertEquals(ALICE_NAME, CordaX500Name.parse(clientConnect.remoteCert!!.subject.toString())) + val msg = amqpClient.createMessage("Test".toByteArray(), + "p2p.inbound", + ALICE_NAME.toString(), + emptyMap()) + amqpClient.write(msg) + assertEquals(MessageStatus.Acknowledged, msg.onComplete.get()) + receiveSubs.unsubscribe() + } + } + } + + @Test + fun `AMPQ Client refuses to connect to unexpected server`() { + val amqpServer = createServer(serverPort, CordaX500Name("Rogue 1", "London", "GB")) + amqpServer.use { + amqpServer.start() + val amqpClient = createClient() + amqpClient.use { + val clientConnected = amqpClient.onConnection.toFuture() + amqpClient.start() + val clientConnect = clientConnected.get() + assertEquals(false, clientConnect.connected) + } + } + } + + @Test + fun `Client Failover for multiple IP`() { + val amqpServer = createServer(serverPort) + val amqpServer2 = createServer(serverPort2) + val amqpClient = createClient() + try { + val serverConnected = amqpServer.onConnection.toFuture() + val serverConnected2 = amqpServer2.onConnection.toFuture() + val clientConnected = amqpClient.onConnection.toBlocking().iterator + amqpServer.start() + amqpClient.start() + val serverConn1 = serverConnected.get() + assertEquals(true, serverConn1.connected) + assertEquals(BOB_NAME, CordaX500Name.parse(serverConn1.remoteCert!!.subject.toString())) + val connState1 = clientConnected.next() + assertEquals(true, connState1.connected) + assertEquals(ALICE_NAME, CordaX500Name.parse(connState1.remoteCert!!.subject.toString())) + assertEquals(serverPort, connState1.remoteAddress.port) + + // Fail over + amqpServer2.start() + amqpServer.stop() + val connState2 = clientConnected.next() + assertEquals(false, connState2.connected) + assertEquals(serverPort, connState2.remoteAddress.port) + val serverConn2 = serverConnected2.get() + assertEquals(true, serverConn2.connected) + assertEquals(BOB_NAME, CordaX500Name.parse(serverConn2.remoteCert!!.subject.toString())) + val connState3 = clientConnected.next() + assertEquals(true, connState3.connected) + assertEquals(ALICE_NAME, CordaX500Name.parse(connState3.remoteCert!!.subject.toString())) + assertEquals(serverPort2, connState3.remoteAddress.port) + + // Fail back + amqpServer.start() + amqpServer2.stop() + val connState4 = clientConnected.next() + assertEquals(false, connState4.connected) + assertEquals(serverPort2, connState4.remoteAddress.port) + val serverConn3 = serverConnected.get() + assertEquals(true, serverConn3.connected) + assertEquals(BOB_NAME, CordaX500Name.parse(serverConn3.remoteCert!!.subject.toString())) + val connState5 = clientConnected.next() + assertEquals(true, connState5.connected) + assertEquals(ALICE_NAME, CordaX500Name.parse(connState5.remoteCert!!.subject.toString())) + assertEquals(serverPort, connState5.remoteAddress.port) + } finally { + amqpClient.close() + amqpServer.close() + amqpServer2.close() + } + } + + @Test + fun `Send a message from AMQP to Artemis inbox`() { + val (server, artemisClient) = createArtemisServerAndClient() + val amqpClient = createClient() + val clientConnected = amqpClient.onConnection.toFuture() + amqpClient.start() + assertEquals(true, clientConnected.get().connected) + assertEquals(CHARLIE_NAME, CordaX500Name.parse(clientConnected.get().remoteCert!!.subject.toString())) + val artemis = artemisClient.started!! + val sendAddress = "p2p.inbound" + artemis.session.createQueue(sendAddress, RoutingType.MULTICAST, "queue", true) + val consumer = artemis.session.createConsumer("queue") + val testData = "Test".toByteArray() + val testProperty = mutableMapOf() + testProperty["TestProp"] = "1" + val message = amqpClient.createMessage(testData, sendAddress, CHARLIE_NAME.toString(), testProperty) + amqpClient.write(message) + assertEquals(MessageStatus.Acknowledged, message.onComplete.get()) + val received = consumer.receive() + assertEquals("1", received.getStringProperty("TestProp")) + assertArrayEquals(testData, ByteArray(received.bodySize).apply { received.bodyBuffer.readBytes(this) }) + amqpClient.stop() + artemisClient.stop() + server.stop() + } + + @Test + fun `shared AMQPClient threadpool tests`() { + val amqpServer = createServer(serverPort) + amqpServer.use { + val connectionEvents = amqpServer.onConnection.toBlocking().iterator + amqpServer.start() + val sharedThreads = NioEventLoopGroup() + val amqpClient1 = createSharedThreadsClient(sharedThreads, 0) + val amqpClient2 = createSharedThreadsClient(sharedThreads, 1) + amqpClient1.start() + val connection1 = connectionEvents.next() + assertEquals(true, connection1.connected) + val connection1ID = CordaX500Name.parse(connection1.remoteCert!!.subject.toString()) + assertEquals("client 0", connection1ID.organisationUnit) + val source1 = connection1.remoteAddress + amqpClient2.start() + val connection2 = connectionEvents.next() + assertEquals(true, connection2.connected) + val connection2ID = CordaX500Name.parse(connection2.remoteCert!!.subject.toString()) + assertEquals("client 1", connection2ID.organisationUnit) + val source2 = connection2.remoteAddress + // Stopping one shouldn't disconnect the other + amqpClient1.stop() + val connection3 = connectionEvents.next() + assertEquals(false, connection3.connected) + assertEquals(source1, connection3.remoteAddress) + assertEquals(false, amqpClient1.connected) + assertEquals(true, amqpClient2.connected) + // Now shutdown both + amqpClient2.stop() + val connection4 = connectionEvents.next() + assertEquals(false, connection4.connected) + assertEquals(source2, connection4.remoteAddress) + assertEquals(false, amqpClient1.connected) + assertEquals(false, amqpClient2.connected) + // Now restarting one should work + amqpClient1.start() + val connection5 = connectionEvents.next() + assertEquals(true, connection5.connected) + val connection5ID = CordaX500Name.parse(connection5.remoteCert!!.subject.toString()) + assertEquals("client 0", connection5ID.organisationUnit) + assertEquals(true, amqpClient1.connected) + assertEquals(false, amqpClient2.connected) + // Cleanup + amqpClient1.stop() + sharedThreads.shutdownGracefully() + sharedThreads.terminationFuture().sync() + } + } + + private fun createArtemisServerAndClient(): Pair { + val artemisConfig = rigorousMock().also { + doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory + doReturn(CHARLIE_NAME).whenever(it).myLegalName + doReturn("trustpass").whenever(it).trustStorePassword + doReturn("cordacadevpass").whenever(it).keyStorePassword + doReturn("").whenever(it).exportJMXto + doReturn(emptyList()).whenever(it).certificateChainCheckPolicies + doReturn(true).whenever(it).useAMQPBridges + } + artemisConfig.configureWithDevSSLCertificate() + + val networkMap = rigorousMock().also { + doReturn(never()).whenever(it).changed + } + val userService = rigorousMock() + val server = ArtemisMessagingServer(artemisConfig, artemisPort, null, networkMap, userService, MAX_MESSAGE_SIZE) + val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE) + server.start() + client.start() + return Pair(server, client) + } + + private fun createClient(): AMQPClient { + val clientConfig = rigorousMock().also { + doReturn(temporaryFolder.root.toPath() / "client").whenever(it).baseDirectory + doReturn(BOB_NAME).whenever(it).myLegalName + doReturn("trustpass").whenever(it).trustStorePassword + doReturn("cordacadevpass").whenever(it).keyStorePassword + } + clientConfig.configureWithDevSSLCertificate() + + val clientTruststore = loadKeyStore(clientConfig.trustStoreFile, clientConfig.trustStorePassword) + val clientKeystore = loadKeyStore(clientConfig.sslKeystore, clientConfig.keyStorePassword) + val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", serverPort), + NetworkHostAndPort("localhost", serverPort2), + NetworkHostAndPort("localhost", artemisPort)), + setOf(ALICE_NAME, CHARLIE_NAME), + PEER_USER, + PEER_USER, + clientKeystore, + clientConfig.keyStorePassword, + clientTruststore, true) + return amqpClient + } + + private fun createSharedThreadsClient(sharedEventGroup: EventLoopGroup, id: Int): AMQPClient { + val clientConfig = rigorousMock().also { + doReturn(temporaryFolder.root.toPath() / "client_%$id").whenever(it).baseDirectory + doReturn(CordaX500Name(null, "client $id", "Corda", "London", null, "GB")).whenever(it).myLegalName + doReturn("trustpass").whenever(it).trustStorePassword + doReturn("cordacadevpass").whenever(it).keyStorePassword + } + clientConfig.configureWithDevSSLCertificate() + + val clientTruststore = loadKeyStore(clientConfig.trustStoreFile, clientConfig.trustStorePassword) + val clientKeystore = loadKeyStore(clientConfig.sslKeystore, clientConfig.keyStorePassword) + val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", serverPort)), + setOf(ALICE_NAME), + PEER_USER, + PEER_USER, + clientKeystore, + clientConfig.keyStorePassword, + clientTruststore, true, sharedEventGroup) + return amqpClient + } + + private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME): AMQPServer { + val serverConfig = rigorousMock().also { + doReturn(temporaryFolder.root.toPath() / "server").whenever(it).baseDirectory + doReturn(name).whenever(it).myLegalName + doReturn("trustpass").whenever(it).trustStorePassword + doReturn("cordacadevpass").whenever(it).keyStorePassword + } + serverConfig.configureWithDevSSLCertificate() + + val serverTruststore = loadKeyStore(serverConfig.trustStoreFile, serverConfig.trustStorePassword) + val serverKeystore = loadKeyStore(serverConfig.sslKeystore, serverConfig.keyStorePassword) + val amqpServer = AMQPServer("0.0.0.0", + port, + PEER_USER, + PEER_USER, + serverKeystore, + serverConfig.keyStorePassword, + serverTruststore) + return amqpServer + } + +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index e87ff6e48b..7d8bf110e1 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -99,7 +99,7 @@ class P2PMessagingTest { } // Wait until the first request is received - crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) + crashingNodes.firstRequestReceived.await() // Stop alice's node after we ensured that the first request was delivered and ignored. alice.dispose() val numberOfRequestsReceived = crashingNodes.requestsReceived.get() @@ -109,8 +109,7 @@ class P2PMessagingTest { // Restart the node and expect a response val aliceRestarted = startAlice() - val response = aliceRestarted.network.onNext(dummyTopic, sessionId).getOrThrow(5.seconds) - + val response = aliceRestarted.network.onNext(dummyTopic, sessionId).getOrThrow() assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived) assertThat(response).isEqualTo(responseMessage) } diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/ConnectionStateMachine.kt new file mode 100644 index 0000000000..8ae8acda99 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -0,0 +1,483 @@ +package net.corda.node.internal.protonwrapper.engine + +import io.netty.buffer.ByteBuf +import io.netty.buffer.PooledByteBufAllocator +import io.netty.buffer.Unpooled +import io.netty.channel.Channel +import io.netty.channel.ChannelHandlerContext +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.debug +import net.corda.node.internal.protonwrapper.messages.MessageStatus +import net.corda.node.internal.protonwrapper.messages.impl.ReceivedMessageImpl +import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl +import org.apache.qpid.proton.Proton +import org.apache.qpid.proton.amqp.Binary +import org.apache.qpid.proton.amqp.Symbol +import org.apache.qpid.proton.amqp.messaging.* +import org.apache.qpid.proton.amqp.messaging.Properties +import org.apache.qpid.proton.amqp.messaging.Target +import org.apache.qpid.proton.amqp.transaction.Coordinator +import org.apache.qpid.proton.amqp.transport.ErrorCondition +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode +import org.apache.qpid.proton.amqp.transport.SenderSettleMode +import org.apache.qpid.proton.engine.* +import org.apache.qpid.proton.message.Message +import org.apache.qpid.proton.message.ProtonJMessage +import org.slf4j.LoggerFactory +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.util.* + +/** + * This ConnectionStateMachine class handles the events generated by the proton-j library to track + * various logical connection, transport and link objects and to drive packet processing. + * It is single threaded per physical SSL connection just like the proton-j library, + * but this threading lock is managed by the EventProcessor class that calls this. + * It ultimately posts application packets to/from from the netty transport pipeline. + */ +internal class ConnectionStateMachine(serverMode: Boolean, + collector: Collector, + private val localLegalName: String, + private val remoteLegalName: String, + userName: String?, + password: String?) : BaseHandler() { + companion object { + private const val IDLE_TIMEOUT = 10000 + } + + val connection: Connection + private val log = LoggerFactory.getLogger(localLegalName) + private val transport: Transport + private val id = UUID.randomUUID().toString() + private var session: Session? = null + private val messageQueues = mutableMapOf>() + private val unackedQueue = LinkedList() + private val receivers = mutableMapOf() + private val senders = mutableMapOf() + private var tagId: Int = 0 + + init { + connection = Engine.connection() + connection.container = "CORDA:$id" + transport = Engine.transport() + transport.idleTimeout = IDLE_TIMEOUT + transport.context = connection + transport.setEmitFlowEventOnSend(true) + connection.collect(collector) + val sasl = transport.sasl() + if (userName != null) { + //TODO This handshake is required for our queue permission logic in Artemis + sasl.setMechanisms("PLAIN") + if (serverMode) { + sasl.server() + sasl.done(Sasl.PN_SASL_OK) + } else { + sasl.plain(userName, password) + sasl.client() + } + } else { + sasl.setMechanisms("ANONYMOUS") + if (serverMode) { + sasl.server() + sasl.done(Sasl.PN_SASL_OK) + } else { + sasl.client() + } + } + transport.bind(connection) + if (!serverMode) { + connection.open() + } + } + + override fun onConnectionInit(event: Event) { + val connection = event.connection + log.debug { "Connection init $connection" } + } + + override fun onConnectionLocalOpen(event: Event) { + val connection = event.connection + log.info("Connection local open $connection") + val session = connection.session() + session.open() + this.session = session + for (target in messageQueues.keys) { + getSender(target) + } + } + + override fun onConnectionLocalClose(event: Event) { + val connection = event.connection + log.info("Connection local close $connection") + connection.close() + connection.free() + } + + override fun onConnectionUnbound(event: Event) { + if (event.connection == this.connection) { + val channel = connection.context as? Channel + if (channel != null) { + if (channel.isActive) { + channel.close() + } + } + } + } + + override fun onConnectionFinal(event: Event) { + val connection = event.connection + log.debug { "Connection final $connection" } + if (connection == this.connection) { + this.connection.context = null + for (queue in messageQueues.values) { + // clear any dead messages + while (true) { + val msg = queue.poll() + if (msg != null) { + msg.doComplete(MessageStatus.Rejected) + msg.release() + } else { + break + } + } + } + messageQueues.clear() + while (true) { + val msg = unackedQueue.poll() + if (msg != null) { + msg.doComplete(MessageStatus.Rejected) + msg.release() + } else { + break + } + } + // shouldn't happen, but close socket channel now if not already done + val channel = connection.context as? Channel + if (channel != null && channel.isActive) { + channel.close() + } + // shouldn't happen, but cleanup any stranded items + transport.context = null + session = null + receivers.clear() + senders.clear() + } + } + + override fun onTransportHeadClosed(event: Event) { + val transport = event.transport + log.debug { "Transport Head Closed $transport" } + transport.close_tail() + } + + override fun onTransportTailClosed(event: Event) { + val transport = event.transport + log.debug { "Transport Tail Closed $transport" } + transport.close_head() + } + + override fun onTransportClosed(event: Event) { + val transport = event.transport + log.debug { "Transport Closed $transport" } + if (transport == this.transport) { + transport.unbind() + transport.free() + transport.context = null + } + } + + override fun onTransportError(event: Event) { + val transport = event.transport + log.info("Transport Error $transport") + val condition = event.transport.condition + if (condition != null) { + log.info("Error: ${condition.description}") + } else { + log.info("Error (no description returned).") + } + } + + override fun onTransport(event: Event) { + val transport = event.transport + log.debug { "Transport $transport" } + onTransportInternal(transport) + } + + private fun onTransportInternal(transport: Transport) { + if (!transport.isClosed) { + val pending = transport.pending() // Note this drives frame generation, which the susbsequent writes push to the socket + if (pending > 0) { + val connection = transport.context as? Connection + val channel = connection?.context as? Channel + channel?.writeAndFlush(transport) + } + } + } + + override fun onSessionInit(event: Event) { + val session = event.session + log.debug { "Session init $session" } + } + + override fun onSessionLocalOpen(event: Event) { + val session = event.session + log.debug { "Session local open $session" } + } + + private fun getSender(target: String): Sender { + if (!senders.containsKey(target)) { + val sender = session!!.sender(UUID.randomUUID().toString()) + sender.source = Source().apply { + address = target + dynamic = false + durable = TerminusDurability.NONE + } + sender.target = Target().apply { + address = target + dynamic = false + durable = TerminusDurability.UNSETTLED_STATE + } + sender.senderSettleMode = SenderSettleMode.UNSETTLED + sender.receiverSettleMode = ReceiverSettleMode.FIRST + senders[target] = sender + sender.open() + } + return senders[target]!! + } + + override fun onSessionLocalClose(event: Event) { + val session = event.session + log.debug { "Session local close $session" } + session.close() + session.free() + } + + override fun onSessionFinal(event: Event) { + val session = event.session + log.debug { "Session final $session" } + if (session == this.session) { + this.session = null + } + } + + override fun onLinkLocalOpen(event: Event) { + val link = event.link + if (link is Sender) { + log.debug { "Sender Link local open ${link.name} ${link.source} ${link.target}" } + senders[link.target.address] = link + transmitMessages(link) + } + if (link is Receiver) { + log.debug { "Receiver Link local open ${link.name} ${link.source} ${link.target}" } + receivers[link.target.address] = link + } + } + + override fun onLinkRemoteOpen(event: Event) { + val link = event.link + if (link is Receiver) { + if (link.remoteTarget is Coordinator) { + log.debug { "Coordinator link received" } + } + } + } + + override fun onLinkFinal(event: Event) { + val link = event.link + if (link is Sender) { + log.debug { "Sender Link final ${link.name} ${link.source} ${link.target}" } + senders.remove(link.target.address) + } + if (link is Receiver) { + log.debug { "Receiver Link final ${link.name} ${link.source} ${link.target}" } + receivers.remove(link.target.address) + } + } + + override fun onLinkFlow(event: Event) { + val link = event.link + if (link is Sender) { + log.debug { "Sender Flow event: ${link.name} ${link.source} ${link.target}" } + if (senders.containsKey(link.target.address)) { + transmitMessages(link) + } + } else if (link is Receiver) { + log.debug { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" } + } + } + + fun processTransport() { + onTransportInternal(transport) + } + + private fun transmitMessages(sender: Sender) { + val messageQueue = messageQueues.getOrPut(sender.target.address, { LinkedList() }) + while (sender.credit > 0) { + log.debug { "Sender credit: ${sender.credit}" } + val nextMessage = messageQueue.poll() + if (nextMessage != null) { + try { + val messageBuf = nextMessage.buf!! + val buf = ByteBuffer.allocate(4) + buf.putInt(tagId++) + val delivery = sender.delivery(buf.array()) + delivery.context = nextMessage + sender.send(messageBuf.array(), messageBuf.arrayOffset() + messageBuf.readerIndex(), messageBuf.readableBytes()) + nextMessage.status = MessageStatus.Sent + log.debug { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" } + unackedQueue.offer(nextMessage) + sender.advance() + } finally { + nextMessage.release() + } + } else { + break + } + } + } + + override fun onDelivery(event: Event) { + val delivery = event.delivery + log.debug { "Delivery $delivery" } + val link = delivery.link + if (link is Receiver) { + if (delivery.isReadable && !delivery.isPartial) { + val pending = delivery.pending() + val amqpMessage = decodeAMQPMessage(pending, link) + val payload = (amqpMessage.body as Data).value.array + val connection = event.connection + val channel = connection?.context as? Channel + if (channel != null) { + val appProperties = HashMap(amqpMessage.applicationProperties.value) + appProperties["_AMQ_VALIDATED_USER"] = remoteLegalName + val localAddress = channel.localAddress() as InetSocketAddress + val remoteAddress = channel.remoteAddress() as InetSocketAddress + val receivedMessage = ReceivedMessageImpl( + payload, + link.source.address, + remoteLegalName, + NetworkHostAndPort(localAddress.hostString, localAddress.port), + localLegalName, + NetworkHostAndPort(remoteAddress.hostString, remoteAddress.port), + appProperties, + channel, + delivery) + log.debug { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" } + channel.writeAndFlush(receivedMessage) + if (link.current() == delivery) { + link.advance() + } + } else { + delivery.disposition(Rejected()) + delivery.settle() + } + } + } else if (link is Sender) { + log.debug { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" } + val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance() + val sourceMessage = delivery.context as? SendableMessageImpl + unackedQueue.remove(sourceMessage) + sourceMessage?.doComplete(if (ok) MessageStatus.Acknowledged else MessageStatus.Rejected) + delivery.settle() + } + } + + private fun encodeAMQPMessage(message: ProtonJMessage): ByteBuf { + val buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500) + try { + try { + message.encode(NettyWritable(buffer)) + val bytes = ByteArray(buffer.writerIndex()) + buffer.readBytes(bytes) + return Unpooled.wrappedBuffer(bytes) + } catch (ex: Exception) { + log.error("Unable to encode message as AMQP packet", ex) + throw ex + } + } finally { + buffer.release() + } + } + + private fun encodePayloadBytes(msg: SendableMessageImpl): ByteBuf { + val message = Proton.message() as ProtonJMessage + message.body = Data(Binary(msg.payload)) + message.properties = Properties() + val appProperties = HashMap(msg.applicationProperties) + //TODO We shouldn't have to do this, but Artemis Server doesn't set the header on AMQP packets. + // Fortunately, when we are bridge to bridge/bridge to float we can authenticate links there. + appProperties["_AMQ_VALIDATED_USER"] = localLegalName + message.applicationProperties = ApplicationProperties(appProperties) + return encodeAMQPMessage(message) + } + + private fun decodeAMQPMessage(pending: Int, link: Receiver): Message { + val msgBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(pending) + try { + link.recv(NettyWritable(msgBuf)) + val amqpMessage = Proton.message() + amqpMessage.decode(msgBuf.array(), msgBuf.arrayOffset() + msgBuf.readerIndex(), msgBuf.readableBytes()) + return amqpMessage + } finally { + msgBuf.release() + } + } + + fun transportWriteMessage(msg: SendableMessageImpl) { + log.debug { "Queue application message write uuid: ${msg.applicationProperties["_AMQ_DUPL_ID"]} ${javax.xml.bind.DatatypeConverter.printHexBinary(msg.payload)}" } + msg.buf = encodePayloadBytes(msg) + val messageQueue = messageQueues.getOrPut(msg.topic, { LinkedList() }) + messageQueue.offer(msg) + if (session != null) { + val sender = getSender(msg.topic) + transmitMessages(sender) + } + } + + fun transportProcessInput(msg: ByteBuf) { + val source = msg.nioBuffer() + try { + do { + val buffer = transport.inputBuffer + val limit = Math.min(buffer.remaining(), source.remaining()) + val duplicate = source.duplicate() + duplicate.limit(source.position() + limit) + buffer.put(duplicate) + transport.processInput().checkIsOk() + source.position(source.position() + limit) + } while (source.hasRemaining()) + } catch (ex: Exception) { + val condition = ErrorCondition() + condition.condition = Symbol.getSymbol("proton:io") + condition.description = ex.message + transport.condition = condition + transport.close_tail() + transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code) + } + } + + fun transportProcessOutput(ctx: ChannelHandlerContext) { + try { + var done = false + while (!done) { + val toWrite = transport.outputBuffer + if (toWrite != null && toWrite.hasRemaining()) { + val outbound = ctx.alloc().buffer(toWrite.remaining()) + outbound.writeBytes(toWrite) + ctx.write(outbound) + transport.outputConsumed() + } else { + done = true + } + } + ctx.flush() + } catch (ex: Exception) { + val condition = ErrorCondition() + condition.condition = Symbol.getSymbol("proton:io") + condition.description = ex.message + transport.condition = condition + transport.close_head() + transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code) + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/EventProcessor.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/EventProcessor.kt new file mode 100644 index 0000000000..31c7de7aed --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/EventProcessor.kt @@ -0,0 +1,136 @@ +package net.corda.node.internal.protonwrapper.engine + +import io.netty.buffer.ByteBuf +import io.netty.channel.Channel +import io.netty.channel.ChannelHandlerContext +import net.corda.core.utilities.debug +import net.corda.node.internal.protonwrapper.messages.MessageStatus +import net.corda.node.internal.protonwrapper.messages.impl.ReceivedMessageImpl +import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl +import org.apache.qpid.proton.Proton +import org.apache.qpid.proton.amqp.messaging.Accepted +import org.apache.qpid.proton.amqp.messaging.Rejected +import org.apache.qpid.proton.amqp.transport.DeliveryState +import org.apache.qpid.proton.amqp.transport.ErrorCondition +import org.apache.qpid.proton.engine.* +import org.apache.qpid.proton.engine.impl.CollectorImpl +import org.apache.qpid.proton.reactor.FlowController +import org.apache.qpid.proton.reactor.Handshaker +import org.slf4j.LoggerFactory +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +/** + * The EventProcessor class converts calls on the netty scheduler/pipeline + * into proton-j engine event calls into the ConnectionStateMachine. + * It also registers a couple of standard event processors for the basic connection handshake + * and simple sliding window flow control, so that these events don't have to live inside ConnectionStateMachine. + * Everything here is single threaded, because the proton-j library has to be run that way. + */ +internal class EventProcessor(channel: Channel, + serverMode: Boolean, + localLegalName: String, + remoteLegalName: String, + userName: String?, + password: String?) : BaseHandler() { + companion object { + private const val FLOW_WINDOW_SIZE = 10 + } + + private val log = LoggerFactory.getLogger(localLegalName) + private val lock = ReentrantLock() + private var pendingExecute: Boolean = false + private val executor: ScheduledExecutorService = channel.eventLoop() + private val collector = Proton.collector() as CollectorImpl + private val handlers = mutableListOf() + private val stateMachine: ConnectionStateMachine = ConnectionStateMachine(serverMode, + collector, + localLegalName, + remoteLegalName, + userName, + password) + + val connection: Connection = stateMachine.connection + + init { + addHandler(Handshaker()) + addHandler(FlowController(FLOW_WINDOW_SIZE)) + addHandler(stateMachine) + connection.context = channel + tick(stateMachine.connection) + } + + fun addHandler(handler: Handler) = handlers.add(handler) + + private fun popEvent(): Event? { + var ev = collector.peek() + if (ev != null) { + ev = ev.copy() // prevent mutation by collector.pop() + collector.pop() + } + return ev + } + + private fun tick(connection: Connection) { + lock.withLock { + try { + if ((connection.localState != EndpointState.CLOSED) && !connection.transport.isClosed) { + val now = System.currentTimeMillis() + val tickDelay = Math.max(0L, connection.transport.tick(now) - now) + executor.schedule({ tick(connection) }, tickDelay, TimeUnit.MILLISECONDS) + } + } catch (ex: Exception) { + connection.transport.close() + connection.condition = ErrorCondition() + } + } + } + + fun processEvents() { + lock.withLock { + pendingExecute = false + log.debug { "Process Events" } + while (true) { + val ev = popEvent() ?: break + log.debug { "Process event: $ev" } + for (handler in handlers) { + handler.handle(ev) + } + } + stateMachine.processTransport() + log.debug { "Process Events Done" } + } + } + + fun processEventsAsync() { + lock.withLock { + if (!pendingExecute) { + pendingExecute = true + executor.execute { processEvents() } + } + } + } + + fun close() { + if (connection.localState != EndpointState.CLOSED) { + connection.close() + processEvents() + connection.free() + processEvents() + } + } + + fun transportProcessInput(msg: ByteBuf) = lock.withLock { stateMachine.transportProcessInput(msg) } + + fun transportProcessOutput(ctx: ChannelHandlerContext) = lock.withLock { stateMachine.transportProcessOutput(ctx) } + + fun transportWriteMessage(msg: SendableMessageImpl) = lock.withLock { stateMachine.transportWriteMessage(msg) } + + fun complete(completer: ReceivedMessageImpl.MessageCompleter) = lock.withLock { + val status: DeliveryState = if (completer.status == MessageStatus.Acknowledged) Accepted.getInstance() else Rejected() + completer.delivery.disposition(status) + completer.delivery.settle() + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/NettyWritable.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/NettyWritable.kt new file mode 100644 index 0000000000..b8e8085273 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/NettyWritable.kt @@ -0,0 +1,63 @@ +package net.corda.node.internal.protonwrapper.engine + +import io.netty.buffer.ByteBuf +import org.apache.qpid.proton.codec.WritableBuffer +import java.nio.ByteBuffer + +/** + * NettyWritable is a utility class allow proton-j encoders to write directly into a + * netty ByteBuf, without any need to materialize a ByteArray copy. + */ +internal class NettyWritable(val nettyBuffer: ByteBuf) : WritableBuffer { + override fun put(b: Byte) { + nettyBuffer.writeByte(b.toInt()) + } + + override fun putFloat(f: Float) { + nettyBuffer.writeFloat(f) + } + + override fun putDouble(d: Double) { + nettyBuffer.writeDouble(d) + } + + override fun put(src: ByteArray, offset: Int, length: Int) { + nettyBuffer.writeBytes(src, offset, length) + } + + override fun putShort(s: Short) { + nettyBuffer.writeShort(s.toInt()) + } + + override fun putInt(i: Int) { + nettyBuffer.writeInt(i) + } + + override fun putLong(l: Long) { + nettyBuffer.writeLong(l) + } + + override fun hasRemaining(): Boolean { + return nettyBuffer.writerIndex() < nettyBuffer.capacity() + } + + override fun remaining(): Int { + return nettyBuffer.capacity() - nettyBuffer.writerIndex() + } + + override fun position(): Int { + return nettyBuffer.writerIndex() + } + + override fun position(position: Int) { + nettyBuffer.writerIndex(position) + } + + override fun put(payload: ByteBuffer) { + nettyBuffer.writeBytes(payload) + } + + override fun limit(): Int { + return nettyBuffer.capacity() + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/ApplicationMessage.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/ApplicationMessage.kt new file mode 100644 index 0000000000..f90c1b172a --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/ApplicationMessage.kt @@ -0,0 +1,14 @@ +package net.corda.node.internal.protonwrapper.messages + +import net.corda.core.utilities.NetworkHostAndPort + +/** + * Represents a common interface for both sendable and received application messages. + */ +interface ApplicationMessage { + val payload: ByteArray + val topic: String + val destinationLegalName: String + val destinationLink: NetworkHostAndPort + val applicationProperties: Map +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/MessageStatus.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/MessageStatus.kt new file mode 100644 index 0000000000..6b792dd59e --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/MessageStatus.kt @@ -0,0 +1,11 @@ +package net.corda.node.internal.protonwrapper.messages + +/** + * The processing state of a message. + */ +enum class MessageStatus { + Unsent, + Sent, + Acknowledged, + Rejected +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/ReceivedMessage.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/ReceivedMessage.kt new file mode 100644 index 0000000000..df4a1fddff --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/ReceivedMessage.kt @@ -0,0 +1,13 @@ +package net.corda.node.internal.protonwrapper.messages + +import net.corda.core.utilities.NetworkHostAndPort + +/** + * An extension of ApplicationMessage that includes origin information. + */ +interface ReceivedMessage : ApplicationMessage { + val sourceLegalName: String + val sourceLink: NetworkHostAndPort + + fun complete(accepted: Boolean) +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/SendableMessage.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/SendableMessage.kt new file mode 100644 index 0000000000..bc4a405e26 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/SendableMessage.kt @@ -0,0 +1,10 @@ +package net.corda.node.internal.protonwrapper.messages + +import net.corda.core.concurrent.CordaFuture + +/** + * An extension of ApplicationMessage to allow completion signalling. + */ +interface SendableMessage : ApplicationMessage { + val onComplete: CordaFuture +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/impl/ReceivedMessageImpl.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/impl/ReceivedMessageImpl.kt new file mode 100644 index 0000000000..1d5f25d59f --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/impl/ReceivedMessageImpl.kt @@ -0,0 +1,30 @@ +package net.corda.node.internal.protonwrapper.messages.impl + +import io.netty.channel.Channel +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.internal.protonwrapper.messages.MessageStatus +import net.corda.node.internal.protonwrapper.messages.ReceivedMessage +import org.apache.qpid.proton.engine.Delivery + +/** + * An internal packet management class that allows tracking of asynchronous acknowledgements + * that in turn send Delivery messages back to the originator. + */ +internal class ReceivedMessageImpl(override val payload: ByteArray, + override val topic: String, + override val sourceLegalName: String, + override val sourceLink: NetworkHostAndPort, + override val destinationLegalName: String, + override val destinationLink: NetworkHostAndPort, + override val applicationProperties: Map, + private val channel: Channel, + private val delivery: Delivery) : ReceivedMessage { + data class MessageCompleter(val status: MessageStatus, val delivery: Delivery) + + override fun complete(accepted: Boolean) { + val status = if (accepted) MessageStatus.Acknowledged else MessageStatus.Rejected + channel.writeAndFlush(MessageCompleter(status, delivery)) + } + + override fun toString(): String = "Received ${String(payload)} $topic" +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/impl/SendableMessageImpl.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/impl/SendableMessageImpl.kt new file mode 100644 index 0000000000..52dd1ed08f --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/messages/impl/SendableMessageImpl.kt @@ -0,0 +1,37 @@ +package net.corda.node.internal.protonwrapper.messages.impl + +import io.netty.buffer.ByteBuf +import net.corda.core.concurrent.CordaFuture +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.internal.protonwrapper.messages.MessageStatus +import net.corda.node.internal.protonwrapper.messages.SendableMessage + +/** + * An internal packet management class that allows handling of the encoded buffers and + * allows registration of an acknowledgement handler when the remote receiver confirms durable storage. + */ +internal class SendableMessageImpl(override val payload: ByteArray, + override val topic: String, + override val destinationLegalName: String, + override val destinationLink: NetworkHostAndPort, + override val applicationProperties: Map) : SendableMessage { + var buf: ByteBuf? = null + @Volatile + var status: MessageStatus = MessageStatus.Unsent + + private val _onComplete = openFuture() + override val onComplete: CordaFuture get() = _onComplete + + fun release() { + buf?.release() + buf = null + } + + fun doComplete(status: MessageStatus) { + this.status = status + _onComplete.set(status) + } + + override fun toString(): String = "Sendable ${String(payload)} $topic $status" +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/AMQPChannelHandler.kt new file mode 100644 index 0000000000..0f9e6cd607 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/AMQPChannelHandler.kt @@ -0,0 +1,156 @@ +package net.corda.node.internal.protonwrapper.netty + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelDuplexHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelPromise +import io.netty.channel.socket.SocketChannel +import io.netty.handler.ssl.SslHandler +import io.netty.handler.ssl.SslHandshakeCompletionEvent +import io.netty.util.ReferenceCountUtil +import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.toX509CertHolder +import net.corda.core.utilities.debug +import net.corda.node.internal.protonwrapper.engine.EventProcessor +import net.corda.node.internal.protonwrapper.messages.ReceivedMessage +import net.corda.node.internal.protonwrapper.messages.impl.ReceivedMessageImpl +import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl +import org.apache.qpid.proton.engine.ProtonJTransport +import org.apache.qpid.proton.engine.Transport +import org.apache.qpid.proton.engine.impl.ProtocolTracer +import org.apache.qpid.proton.framing.TransportFrame +import org.bouncycastle.cert.X509CertificateHolder +import org.slf4j.LoggerFactory +import java.net.InetSocketAddress + +/** + * An instance of AMQPChannelHandler sits inside the netty pipeline and controls the socket level lifecycle. + * It also add some extra checks to the SSL handshake to support our non-standard certificate checks of legal identity. + * When a valid SSL connections is made then it initialises a proton-j engine instance to handle the protocol layer. + */ +internal class AMQPChannelHandler(private val serverMode: Boolean, + private val allowedRemoteLegalNames: Set?, + private val userName: String?, + private val password: String?, + private val trace: Boolean, + private val onOpen: (Pair) -> Unit, + private val onClose: (Pair) -> Unit, + private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() { + private val log = LoggerFactory.getLogger(allowedRemoteLegalNames?.firstOrNull()?.toString() ?: "AMQPChannelHandler") + private lateinit var remoteAddress: InetSocketAddress + private lateinit var localCert: X509CertificateHolder + private lateinit var remoteCert: X509CertificateHolder + private var eventProcessor: EventProcessor? = null + + override fun channelActive(ctx: ChannelHandlerContext) { + val ch = ctx.channel() + remoteAddress = ch.remoteAddress() as InetSocketAddress + val localAddress = ch.localAddress() as InetSocketAddress + log.info("New client connection ${ch.id()} from ${remoteAddress} to ${localAddress}") + } + + private fun createAMQPEngine(ctx: ChannelHandlerContext) { + val ch = ctx.channel() + eventProcessor = EventProcessor(ch, serverMode, localCert.subject.toString(), remoteCert.subject.toString(), userName, password) + val connection = eventProcessor!!.connection + val transport = connection.transport as ProtonJTransport + if (trace) { + transport.protocolTracer = object : ProtocolTracer { + override fun sentFrame(transportFrame: TransportFrame) { + log.info("${transportFrame.body}") + } + + override fun receivedFrame(transportFrame: TransportFrame) { + log.info("${transportFrame.body}") + } + } + } + ctx.fireChannelActive() + eventProcessor!!.processEventsAsync() + } + + override fun channelInactive(ctx: ChannelHandlerContext) { + val ch = ctx.channel() + log.info("Closed client connection ${ch.id()} from ${remoteAddress} to ${ch.localAddress()}") + onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, null, false))) + eventProcessor?.close() + ctx.fireChannelInactive() + } + + override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { + if (evt is SslHandshakeCompletionEvent) { + if (evt.isSuccess) { + val sslHandler = ctx.pipeline().get(SslHandler::class.java) + localCert = sslHandler.engine().session.localCertificates.first().toX509CertHolder() + remoteCert = sslHandler.engine().session.peerCertificates.first().toX509CertHolder() + try { + val remoteX500Name = CordaX500Name.parse(remoteCert.subject.toString()) + require(allowedRemoteLegalNames == null || remoteX500Name in allowedRemoteLegalNames) + log.info("handshake completed subject: ${remoteX500Name}") + } catch (ex: IllegalArgumentException) { + log.error("Invalid certificate subject", ex) + ctx.close() + return + } + createAMQPEngine(ctx) + onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true))) + } else { + log.error("Handshake failure $evt") + ctx.close() + } + } + } + + override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { + try { + log.debug { "Received $msg" } + if (msg is ByteBuf) { + eventProcessor!!.transportProcessInput(msg) + } + } finally { + ReferenceCountUtil.release(msg) + } + eventProcessor!!.processEventsAsync() + } + + override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) { + try { + try { + log.debug { "Sent $msg" } + when (msg) { + // Transfers application packet into the AMQP engine. + is SendableMessageImpl -> { + val inetAddress = InetSocketAddress(msg.destinationLink.host, msg.destinationLink.port) + require(inetAddress == remoteAddress) { + "Message for incorrect endpoint" + } + require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.parse(remoteCert.subject.toString())) { + "Message for incorrect legal identity" + } + log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" } + eventProcessor!!.transportWriteMessage(msg) + } + // A received AMQP packet has been completed and this self-posted packet will be signalled out to the + // external application. + is ReceivedMessage -> { + onReceive(msg) + } + // A general self-posted event that triggers creation of AMQP frames when required. + is Transport -> { + eventProcessor!!.transportProcessOutput(ctx) + } + // A self-posted event that forwards status updates for delivered packets to the application. + is ReceivedMessageImpl.MessageCompleter -> { + eventProcessor!!.complete(msg) + } + } + } catch (ex: Exception) { + log.error("Error in AMQP write processing", ex) + throw ex + } + } finally { + ReferenceCountUtil.release(msg) + } + eventProcessor!!.processEventsAsync() + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/AMQPClient.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/AMQPClient.kt new file mode 100644 index 0000000000..761248797f --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/AMQPClient.kt @@ -0,0 +1,194 @@ +package net.corda.node.internal.protonwrapper.netty + +import io.netty.bootstrap.Bootstrap +import io.netty.channel.* +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.handler.logging.LogLevel +import io.netty.handler.logging.LoggingHandler +import io.netty.util.internal.logging.InternalLoggerFactory +import io.netty.util.internal.logging.Slf4JLoggerFactory +import net.corda.core.identity.CordaX500Name +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger +import net.corda.node.internal.protonwrapper.messages.ReceivedMessage +import net.corda.node.internal.protonwrapper.messages.SendableMessage +import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl +import rx.Observable +import rx.subjects.PublishSubject +import java.security.KeyStore +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock +import javax.net.ssl.KeyManagerFactory +import javax.net.ssl.TrustManagerFactory +import kotlin.concurrent.withLock + +/** + * The AMQPClient creates a connection initiator that will try to connect in a round-robin fashion + * to the first open SSL socket. It will keep retrying until it is stopped. + * To allow thread resource control it can accept a shared thread pool as constructor input, + * otherwise it creates a self-contained Netty thraed pool and socket objects. + * Once connected it can accept application packets to send via the AMQP protocol. + */ +class AMQPClient(val targets: List, + val allowedRemoteLegalNames: Set, + private val userName: String?, + private val password: String?, + private val keyStore: KeyStore, + private val keyStorePrivateKeyPassword: String, + private val trustStore: KeyStore, + private val trace: Boolean = false, + private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable { + companion object { + init { + InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE) + } + + val log = contextLogger() + const val RETRY_INTERVAL = 1000L + const val NUM_CLIENT_THREADS = 2 + } + + private val lock = ReentrantLock() + @Volatile + private var stopping: Boolean = false + private var workerGroup: EventLoopGroup? = null + @Volatile + private var clientChannel: Channel? = null + // Offset into the list of targets, so that we can implement round-robin reconnect logic. + private var targetIndex = 0 + private var currentTarget: NetworkHostAndPort = targets.first() + + private val connectListener = object : ChannelFutureListener { + override fun operationComplete(future: ChannelFuture) { + if (!future.isSuccess) { + log.info("Failed to connect to $currentTarget") + + if (!stopping) { + workerGroup?.schedule({ + log.info("Retry connect to $currentTarget") + targetIndex = (targetIndex + 1).rem(targets.size) + restart() + }, RETRY_INTERVAL, TimeUnit.MILLISECONDS) + } + } else { + log.info("Connected to $currentTarget") + // Connection established successfully + clientChannel = future.channel() + clientChannel?.closeFuture()?.addListener(closeListener) + } + } + } + + private val closeListener = object : ChannelFutureListener { + override fun operationComplete(future: ChannelFuture) { + log.info("Disconnected from $currentTarget") + future.channel()?.disconnect() + clientChannel = null + if (!stopping) { + workerGroup?.schedule({ + log.info("Retry connect") + targetIndex = (targetIndex + 1).rem(targets.size) + restart() + }, RETRY_INTERVAL, TimeUnit.MILLISECONDS) + } + } + } + + private class ClientChannelInitializer(val parent: AMQPClient) : ChannelInitializer() { + private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) + private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()) + + init { + keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword.toCharArray()) + trustManagerFactory.init(parent.trustStore) + } + + override fun initChannel(ch: SocketChannel) { + val pipeline = ch.pipeline() + val handler = createClientSslHelper(parent.currentTarget, keyManagerFactory, trustManagerFactory) + pipeline.addLast("sslHandler", handler) + if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) + pipeline.addLast(AMQPChannelHandler(false, + parent.allowedRemoteLegalNames, + parent.userName, + parent.password, + parent.trace, + { parent._onConnection.onNext(it.second) }, + { parent._onConnection.onNext(it.second) }, + { rcv -> parent._onReceive.onNext(rcv) })) + } + } + + fun start() { + lock.withLock { + log.info("connect to: $currentTarget") + workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS) + restart() + } + } + + private fun restart() { + val bootstrap = Bootstrap() + // TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux + bootstrap.group(workerGroup). + channel(NioSocketChannel::class.java). + handler(ClientChannelInitializer(this)) + currentTarget = targets[targetIndex] + val clientFuture = bootstrap.connect(currentTarget.host, currentTarget.port) + clientFuture.addListener(connectListener) + } + + fun stop() { + lock.withLock { + log.info("disconnect from: $currentTarget") + stopping = true + try { + if (sharedThreadPool == null) { + workerGroup?.shutdownGracefully() + workerGroup?.terminationFuture()?.sync() + } else { + clientChannel?.close()?.sync() + } + clientChannel = null + workerGroup = null + } finally { + stopping = false + } + log.info("stopped connection to $currentTarget") + } + } + + override fun close() = stop() + + val connected: Boolean + get() { + val channel = lock.withLock { clientChannel } + return channel?.isActive ?: false + } + + fun createMessage(payload: ByteArray, + topic: String, + destinationLegalName: String, + properties: Map): SendableMessage { + return SendableMessageImpl(payload, topic, destinationLegalName, currentTarget, properties) + } + + fun write(msg: SendableMessage) { + val channel = clientChannel + if (channel == null) { + throw IllegalStateException("Connection to $targets not active") + } else { + channel.writeAndFlush(msg) + } + } + + private val _onReceive = PublishSubject.create().toSerialized() + val onReceive: Observable + get() = _onReceive + + private val _onConnection = PublishSubject.create().toSerialized() + val onConnection: Observable + get() = _onConnection +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/AMQPServer.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/AMQPServer.kt new file mode 100644 index 0000000000..6398a6776b --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/AMQPServer.kt @@ -0,0 +1,187 @@ +package net.corda.node.internal.protonwrapper.netty + +import io.netty.bootstrap.ServerBootstrap +import io.netty.channel.Channel +import io.netty.channel.ChannelInitializer +import io.netty.channel.ChannelOption +import io.netty.channel.EventLoopGroup +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioServerSocketChannel +import io.netty.handler.logging.LogLevel +import io.netty.handler.logging.LoggingHandler +import io.netty.util.internal.logging.InternalLoggerFactory +import io.netty.util.internal.logging.Slf4JLoggerFactory +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger +import net.corda.node.internal.protonwrapper.messages.ReceivedMessage +import net.corda.node.internal.protonwrapper.messages.SendableMessage +import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl +import org.apache.qpid.proton.engine.Delivery +import rx.Observable +import rx.subjects.PublishSubject +import java.net.BindException +import java.net.InetSocketAddress +import java.security.KeyStore +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.locks.ReentrantLock +import javax.net.ssl.KeyManagerFactory +import javax.net.ssl.TrustManagerFactory +import kotlin.concurrent.withLock + +/** + * This create a socket acceptor instance that can receive possibly multiple AMQP connections. + * As of now this is not used outside of testing, but in future it will be used for standalone bridging components. + */ +class AMQPServer(val hostName: String, + val port: Int, + private val userName: String?, + private val password: String?, + private val keyStore: KeyStore, + private val keyStorePrivateKeyPassword: String, + private val trustStore: KeyStore, + private val trace: Boolean = false) : AutoCloseable { + + companion object { + init { + InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE) + } + + private val log = contextLogger() + const val NUM_SERVER_THREADS = 4 + } + + private val lock = ReentrantLock() + @Volatile + private var stopping: Boolean = false + private var bossGroup: EventLoopGroup? = null + private var workerGroup: EventLoopGroup? = null + private var serverChannel: Channel? = null + private val clientChannels = ConcurrentHashMap() + + init { + } + + private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer() { + private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) + private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()) + + init { + keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword.toCharArray()) + trustManagerFactory.init(parent.trustStore) + } + + override fun initChannel(ch: SocketChannel) { + val pipeline = ch.pipeline() + val handler = createServerSslHelper(keyManagerFactory, trustManagerFactory) + pipeline.addLast("sslHandler", handler) + if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) + pipeline.addLast(AMQPChannelHandler(true, + null, + parent.userName, + parent.password, + parent.trace, + { + parent.clientChannels.put(it.first.remoteAddress(), it.first) + parent._onConnection.onNext(it.second) + }, + { + parent.clientChannels.remove(it.first.remoteAddress()) + parent._onConnection.onNext(it.second) + }, + { rcv -> parent._onReceive.onNext(rcv) })) + } + } + + fun start() { + lock.withLock { + stop() + + bossGroup = NioEventLoopGroup(1) + workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS) + + val server = ServerBootstrap() + // TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux + server.group(bossGroup, workerGroup). + channel(NioServerSocketChannel::class.java). + option(ChannelOption.SO_BACKLOG, 100). + handler(LoggingHandler(LogLevel.INFO)). + childHandler(ServerChannelInitializer(this)) + + log.info("Try to bind $port") + val channelFuture = server.bind(hostName, port).sync() // block/throw here as better to know we failed to claim port than carry on + if (!channelFuture.isDone || !channelFuture.isSuccess) { + throw BindException("Failed to bind port $port") + } + log.info("Listening on port $port") + serverChannel = channelFuture.channel() + } + } + + fun stop() { + lock.withLock { + try { + stopping = true + serverChannel?.apply { close() } + serverChannel = null + + workerGroup?.shutdownGracefully() + workerGroup?.terminationFuture()?.sync() + + bossGroup?.shutdownGracefully() + bossGroup?.terminationFuture()?.sync() + + workerGroup = null + bossGroup = null + } finally { + stopping = false + } + } + } + + override fun close() = stop() + + val listening: Boolean + get() { + val channel = lock.withLock { serverChannel } + return channel?.isActive ?: false + } + + fun createMessage(payload: ByteArray, + topic: String, + destinationLegalName: String, + destinationLink: NetworkHostAndPort, + properties: Map): SendableMessage { + val dest = InetSocketAddress(destinationLink.host, destinationLink.port) + require(dest in clientChannels.keys) { + "Destination not available" + } + return SendableMessageImpl(payload, topic, destinationLegalName, destinationLink, properties) + } + + fun write(msg: SendableMessage) { + val dest = InetSocketAddress(msg.destinationLink.host, msg.destinationLink.port) + val channel = clientChannels[dest] + if (channel == null) { + throw IllegalStateException("Connection to ${msg.destinationLink} not active") + } else { + channel.writeAndFlush(msg) + } + } + + fun complete(delivery: Delivery, target: InetSocketAddress) { + val channel = clientChannels[target] + channel?.apply { + writeAndFlush(delivery) + } + } + + private val _onReceive = PublishSubject.create().toSerialized() + val onReceive: Observable + get() = _onReceive + + private val _onConnection = PublishSubject.create().toSerialized() + val onConnection: Observable + get() = _onConnection + +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/ConnectionChange.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/ConnectionChange.kt new file mode 100644 index 0000000000..a576a25d2b --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/ConnectionChange.kt @@ -0,0 +1,6 @@ +package net.corda.node.internal.protonwrapper.netty + +import org.bouncycastle.cert.X509CertificateHolder +import java.net.InetSocketAddress + +data class ConnectionChange(val remoteAddress: InetSocketAddress, val remoteCert: X509CertificateHolder?, val connected: Boolean) \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/SSLHelper.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/SSLHelper.kt new file mode 100644 index 0000000000..7a778d26cf --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/netty/SSLHelper.kt @@ -0,0 +1,39 @@ +package net.corda.node.internal.protonwrapper.netty + +import io.netty.handler.ssl.SslHandler +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.nodeapi.ArtemisTcpTransport +import java.security.SecureRandom +import javax.net.ssl.KeyManagerFactory +import javax.net.ssl.SSLContext +import javax.net.ssl.TrustManagerFactory + +internal fun createClientSslHelper(target: NetworkHostAndPort, + keyManagerFactory: KeyManagerFactory, + trustManagerFactory: TrustManagerFactory): SslHandler { + val sslContext = SSLContext.getInstance("TLS") + val keyManagers = keyManagerFactory.keyManagers + val trustManagers = trustManagerFactory.trustManagers + sslContext.init(keyManagers, trustManagers, SecureRandom()) + val sslEngine = sslContext.createSSLEngine(target.host, target.port) + sslEngine.useClientMode = true + sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray() + sslEngine.enabledCipherSuites = ArtemisTcpTransport.CIPHER_SUITES.toTypedArray() + sslEngine.enableSessionCreation = true + return SslHandler(sslEngine) +} + +internal fun createServerSslHelper(keyManagerFactory: KeyManagerFactory, + trustManagerFactory: TrustManagerFactory): SslHandler { + val sslContext = SSLContext.getInstance("TLS") + val keyManagers = keyManagerFactory.keyManagers + val trustManagers = trustManagerFactory.trustManagers + sslContext.init(keyManagers, trustManagers, SecureRandom()) + val sslEngine = sslContext.createSSLEngine() + sslEngine.useClientMode = false + sslEngine.needClientAuth = true + sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray() + sslEngine.enabledCipherSuites = ArtemisTcpTransport.CIPHER_SUITES.toTypedArray() + sslEngine.enableSessionCreation = true + return SslHandler(sslEngine) +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 7d47ea60cb..60acff95d1 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -6,10 +6,10 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.seconds import net.corda.node.services.messaging.CertificateChainCheckPolicy -import net.corda.nodeapi.internal.persistence.DatabaseConfig -import net.corda.nodeapi.internal.config.User import net.corda.nodeapi.internal.config.NodeSSLConfiguration +import net.corda.nodeapi.internal.config.User import net.corda.nodeapi.internal.config.parseAs +import net.corda.nodeapi.internal.persistence.DatabaseConfig import java.net.URL import java.nio.file.Path import java.util.* @@ -42,6 +42,7 @@ interface NodeConfiguration : NodeSSLConfiguration { val detectPublicIp: Boolean get() = true val sshd: SSHDConfiguration? val database: DatabaseConfig + val useAMQPBridges: Boolean get() = true } data class DevModeOptions(val disableCheckpointChecker: Boolean = false) @@ -116,7 +117,8 @@ data class NodeConfigurationImpl( // TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(), override val sshd: SSHDConfiguration? = null, - override val database: DatabaseConfig = DatabaseConfig(initialiseSchema = devMode, exportHibernateJMXStatistics = devMode) + override val database: DatabaseConfig = DatabaseConfig(initialiseSchema = devMode, exportHibernateJMXStatistics = devMode), + override val useAMQPBridges: Boolean = true ) : NodeConfiguration { override val exportJMXto: String get() = "http" diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/AMQPBridgeManager.kt b/node/src/main/kotlin/net/corda/node/services/messaging/AMQPBridgeManager.kt new file mode 100644 index 0000000000..8b285bf999 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/AMQPBridgeManager.kt @@ -0,0 +1,203 @@ +package net.corda.node.services.messaging + +import io.netty.channel.EventLoopGroup +import io.netty.channel.nio.NioEventLoopGroup +import net.corda.core.identity.CordaX500Name +import net.corda.core.node.NodeInfo +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.debug +import net.corda.node.internal.protonwrapper.messages.MessageStatus +import net.corda.node.internal.protonwrapper.netty.AMQPClient +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.messaging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName +import net.corda.nodeapi.internal.ArtemisMessagingComponent +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER +import net.corda.nodeapi.internal.crypto.loadKeyStore +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE +import org.apache.activemq.artemis.api.core.client.ClientConsumer +import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientSession +import org.slf4j.LoggerFactory +import rx.Subscription +import java.security.KeyStore +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +/** + * The AMQPBridgeManager holds the list of independent AMQPBridge objects that actively ferry messages to remote Artemis + * inboxes. + * The AMQPBridgeManager also provides a single shared connection to Artemis, although each bridge then creates an + * independent Session for message consumption. + * The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager. + */ +internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager { + + private val lock = ReentrantLock() + private val bridgeNameToBridgeMap = mutableMapOf() + private var sharedEventLoopGroup: EventLoopGroup? = null + private val keyStore = loadKeyStore(config.sslKeystore, config.keyStorePassword) + private val keyStorePrivateKeyPassword: String = config.keyStorePassword + private val trustStore = loadKeyStore(config.trustStoreFile, config.trustStorePassword) + private var artemis: ArtemisMessagingClient? = null + + companion object { + private const val NUM_BRIDGE_THREADS = 0 // Default sized pool + } + + /** + * Each AMQPBridge is an independent consumer of messages from the Artemis local queue per designated endpoint. + * It attempts to deliver these messages via an AMQPClient instance to the remote Artemis inbox. + * To prevent race conditions the Artemis session/consumer is only created when the AMQPClient has a stable AMQP connection. + * The acknowledgement and removal of messages from the local queue only occurs if there successful end-to-end delivery. + * If the delivery fails the session is rolled back to prevent loss of the message. This may cause duplicate delivery, + * however Artemis and the remote Corda instanced will deduplicate these messages. + */ + private class AMQPBridge(private val queueName: String, + private val target: NetworkHostAndPort, + private val legalNames: Set, + keyStore: KeyStore, + keyStorePrivateKeyPassword: String, + trustStore: KeyStore, + sharedEventGroup: EventLoopGroup, + private val artemis: ArtemisMessagingClient) { + companion object { + fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort" + } + + private val log = LoggerFactory.getLogger("$bridgeName:${legalNames.first()}") + + val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, sharedThreadPool = sharedEventGroup) + val bridgeName: String get() = getBridgeName(queueName, target) + private val lock = ReentrantLock() // lock to serialise session level access + private var session: ClientSession? = null + private var consumer: ClientConsumer? = null + private var connectedSubscription: Subscription? = null + + fun start() { + log.info("Create new AMQP bridge") + connectedSubscription = amqpClient.onConnection.subscribe({ x -> onSocketConnected(x.connected) }) + amqpClient.start() + } + + fun stop() { + log.info("Stopping AMQP bridge") + lock.withLock { + synchronized(artemis) { + consumer?.close() + consumer = null + session?.stop() + session = null + } + } + amqpClient.stop() + connectedSubscription?.unsubscribe() + connectedSubscription = null + } + + private fun onSocketConnected(connected: Boolean) { + lock.withLock { + synchronized(artemis) { + if (connected) { + log.info("Bridge Connected") + val sessionFactory = artemis.started!!.sessionFactory + val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, false, false, false, DEFAULT_ACK_BATCH_SIZE) + this.session = session + val consumer = session.createConsumer(queueName) + this.consumer = consumer + consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler) + session.start() + } else { + log.info("Bridge Disconnected") + consumer?.close() + consumer = null + session?.stop() + session = null + } + } + } + } + + private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { + lock.withLock { + val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) } + val properties = HashMap() + for (key in artemisMessage.propertyNames) { + var value = artemisMessage.getObjectProperty(key) + if (value is SimpleString) { + value = value.toString() + } + properties[key.toString()] = value + } + log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } + val sendableMessage = amqpClient.createMessage(data, P2P_QUEUE, + legalNames.first().toString(), + properties) + sendableMessage.onComplete.then { + log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" } + lock.withLock { + if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { + artemisMessage.acknowledge() + session?.commit() + } else { + log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") + session?.rollback(false) + } + } + } + amqpClient.write(sendableMessage) + } + } + } + + private fun gatherAddresses(node: NodeInfo): Sequence { + val address = node.addresses.first() + return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence() + } + + override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set) { + if (bridgeExists(getBridgeName(queueName, target))) { + return + } + val newBridge = AMQPBridge(queueName, target, legalNames, keyStore, keyStorePrivateKeyPassword, trustStore, sharedEventLoopGroup!!, artemis!!) + lock.withLock { + bridgeNameToBridgeMap[newBridge.bridgeName] = newBridge + } + newBridge.start() + } + + override fun destroyBridges(node: NodeInfo) { + lock.withLock { + gatherAddresses(node).forEach { + val bridge = bridgeNameToBridgeMap.remove(getBridgeName(it.queueName, it.hostAndPort)) + bridge?.stop() + } + } + } + + override fun bridgeExists(bridgeName: String): Boolean = lock.withLock { bridgeNameToBridgeMap.containsKey(bridgeName) } + + override fun start() { + sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS) + val artemis = ArtemisMessagingClient(config, p2pAddress, maxMessageSize) + this.artemis = artemis + artemis.start() + } + + override fun stop() = close() + + override fun close() { + lock.withLock { + for (bridge in bridgeNameToBridgeMap.values) { + bridge.stop() + } + sharedEventLoopGroup?.shutdownGracefully() + sharedEventLoopGroup?.terminationFuture()?.sync() + sharedEventLoopGroup = null + bridgeNameToBridgeMap.clear() + artemis?.stop() + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingClient.kt index 9a160138c1..84dd18298d 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingClient.kt @@ -3,12 +3,15 @@ package net.corda.node.services.messaging import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.internal.config.SSLConfiguration -import org.apache.activemq.artemis.api.core.client.* +import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE +import org.apache.activemq.artemis.api.core.client.ClientProducer +import org.apache.activemq.artemis.api.core.client.ClientSession +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory class ArtemisMessagingClient(private val config: SSLConfiguration, private val serverAddress: NetworkHostAndPort, private val maxMessageSize: Int) { companion object { @@ -46,7 +49,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration, private val s } fun stop() = synchronized(this) { - started!!.run { + started?.run { producer.close() // Ensure any trailing messages are committed to the journal session.commit() diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 4b80354fd2..285d4ce94c 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -1,6 +1,5 @@ package net.corda.node.services.messaging -import io.netty.handler.ssl.SslHandler import net.corda.core.crypto.AddressFormatException import net.corda.core.crypto.newSecureRandom import net.corda.core.identity.CordaX500Name @@ -24,11 +23,10 @@ import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.VERIFIER_ROLE -import net.corda.nodeapi.internal.crypto.X509Utilities -import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_CLIENT_TLS -import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_ROOT_CA -import net.corda.nodeapi.internal.crypto.loadKeyStore -import net.corda.nodeapi.* +import net.corda.nodeapi.ArtemisTcpTransport +import net.corda.nodeapi.ConnectionDirection +import net.corda.nodeapi.RPCApi +import net.corda.nodeapi.VerifierApi import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER @@ -37,15 +35,17 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress +import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_CLIENT_TLS +import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_ROOT_CA +import net.corda.nodeapi.internal.crypto.loadKeyStore import net.corda.nodeapi.internal.requireOnDefaultFileSystem import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl -import org.apache.activemq.artemis.core.config.BridgeConfiguration import org.apache.activemq.artemis.core.config.Configuration import org.apache.activemq.artemis.core.config.CoreQueueConfiguration import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration -import org.apache.activemq.artemis.core.remoting.impl.netty.* +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory import org.apache.activemq.artemis.core.security.Role import org.apache.activemq.artemis.core.server.ActiveMQServer import org.apache.activemq.artemis.core.server.SecuritySettingPlugin @@ -53,22 +53,17 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl import org.apache.activemq.artemis.core.settings.HierarchicalRepository import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy import org.apache.activemq.artemis.core.settings.impl.AddressSettings -import org.apache.activemq.artemis.spi.core.remoting.* import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal -import org.apache.activemq.artemis.utils.ConfigurationHelper import rx.Subscription import java.io.IOException import java.math.BigInteger import java.security.KeyStore import java.security.KeyStoreException import java.security.Principal -import java.time.Duration import java.util.* -import java.util.concurrent.Executor -import java.util.concurrent.ScheduledExecutorService import javax.annotation.concurrent.ThreadSafe import javax.security.auth.Subject import javax.security.auth.callback.CallbackHandler @@ -80,7 +75,6 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE import javax.security.auth.login.FailedLoginException import javax.security.auth.login.LoginException import javax.security.auth.spi.LoginModule -import javax.security.auth.x500.X500Principal import javax.security.cert.CertificateException // TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman. @@ -114,6 +108,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, private lateinit var activeMQServer: ActiveMQServer val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl private var networkChangeHandle: Subscription? = null + private lateinit var bridgeManager: BridgeManager init { config.baseDirectory.requireOnDefaultFileSystem() @@ -133,6 +128,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, } fun stop() = mutex.locked { + bridgeManager.close() networkChangeHandle?.unsubscribe() networkChangeHandle = null activeMQServer.stop() @@ -153,7 +149,14 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) } registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } } } + // Config driven switch between legacy CORE bridges and the newer AMQP protocol bridges. + bridgeManager = if (config.useAMQPBridges) { + AMQPBridgeManager(config, NetworkHostAndPort("localhost", p2pPort), maxMessageSize) + } else { + CoreBridgeManager(config, activeMQServer) + } activeMQServer.start() + bridgeManager.start() Node.printBasicNodeInfo("Listening on port", p2pPort.toString()) if (rpcPort != null) { Node.printBasicNodeInfo("RPC service listening on port", rpcPort.toString()) @@ -212,11 +215,13 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, addressFullMessagePolicy = AddressFullMessagePolicy.FAIL } ) - // JMX enablement - if (config.exportJMXto.isNotEmpty()) {isJMXManagementEnabled = true - isJMXUseBrokerName = true} + // JMX enablement + if (config.exportJMXto.isNotEmpty()) { + isJMXManagementEnabled = true + isJMXUseBrokerName = true + } - }.configureAddressSecurity() + }.configureAddressSecurity() private fun queueConfig(name: String, address: String = name, filter: String? = null, durable: Boolean): CoreQueueConfiguration { @@ -299,7 +304,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, fun deployBridgeToPeer(nodeInfo: NodeInfo) { log.debug("Deploying bridge for $queueName to $nodeInfo") val address = nodeInfo.addresses.first() - deployBridge(queueName, address, nodeInfo.legalIdentitiesAndCerts.map { it.name }.toSet()) + bridgeManager.deployBridge(queueName, address, nodeInfo.legalIdentitiesAndCerts.map { it.name }.toSet()) } if (queueName.startsWith(PEERS_PREFIX)) { @@ -334,147 +339,39 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, fun deployBridges(node: NodeInfo) { gatherAddresses(node) - .filter { queueExists(it.queueName) && !bridgeExists(it.bridgeName) } + .filter { queueExists(it.queueName) && !bridgeManager.bridgeExists(it.bridgeName) } .forEach { deployBridge(it, node.legalIdentitiesAndCerts.map { it.name }.toSet()) } } - fun destroyBridges(node: NodeInfo) { - gatherAddresses(node).forEach { - activeMQServer.destroyBridge(it.bridgeName) - } - } - when (change) { is MapChange.Added -> { deployBridges(change.node) } is MapChange.Removed -> { - destroyBridges(change.node) + bridgeManager.destroyBridges(change.node) } is MapChange.Modified -> { // TODO Figure out what has actually changed and only destroy those bridges that need to be. - destroyBridges(change.previousNode) + bridgeManager.destroyBridges(change.previousNode) deployBridges(change.node) } } } private fun deployBridge(address: ArtemisPeerAddress, legalNames: Set) { - deployBridge(address.queueName, address.hostAndPort, legalNames) + bridgeManager.deployBridge(address.queueName, address.hostAndPort, legalNames) } private fun createTcpTransport(connectionDirection: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true) = ArtemisTcpTransport.tcpTransport(connectionDirection, NetworkHostAndPort(host, port), config, enableSSL = enableSSL) - /** - * All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving - * messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it, - * as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's - * P2P address. - */ - private fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set) { - val connectionDirection = ConnectionDirection.Outbound( - connectorFactoryClassName = VerifyingNettyConnectorFactory::class.java.name, - expectedCommonNames = legalNames - ) - val tcpTransport = createTcpTransport(connectionDirection, target.host, target.port) - tcpTransport.params[ArtemisMessagingServer::class.java.name] = this - // We intentionally overwrite any previous connector config in case the peer legal name changed - activeMQServer.configuration.addConnectorConfiguration(target.toString(), tcpTransport) - - activeMQServer.deployBridge(BridgeConfiguration().apply { - name = getBridgeName(queueName, target) - this.queueName = queueName - forwardingAddress = P2P_QUEUE - staticConnectors = listOf(target.toString()) - confirmationWindowSize = 100000 // a guess - isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic - // We keep trying until the network map deems the node unreachable and tells us it's been removed at which - // point we destroy the bridge - retryInterval = config.activeMQServer.bridge.retryIntervalMs - retryIntervalMultiplier = config.activeMQServer.bridge.retryIntervalMultiplier - maxRetryInterval = Duration.ofMinutes(config.activeMQServer.bridge.maxRetryIntervalMin).toMillis() - // As a peer of the target node we must connect to it using the peer user. Actual authentication is done using - // our TLS certificate. - user = PEER_USER - password = PEER_USER - }) - } - private fun queueExists(queueName: String): Boolean = activeMQServer.queueQuery(SimpleString(queueName)).isExists - private fun bridgeExists(bridgeName: String): Boolean = activeMQServer.clusterManager.bridges.containsKey(bridgeName) - private val ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort) private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort" } -class VerifyingNettyConnectorFactory : NettyConnectorFactory() { - override fun createConnector(configuration: MutableMap, - handler: BufferHandler?, - listener: ClientConnectionLifeCycleListener?, - closeExecutor: Executor?, - threadPool: Executor?, - scheduledThreadPool: ScheduledExecutorService?, - protocolManager: ClientProtocolManager?): Connector { - return VerifyingNettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, - protocolManager) - } -} - -private class VerifyingNettyConnector(configuration: MutableMap, - handler: BufferHandler?, - listener: ClientConnectionLifeCycleListener?, - closeExecutor: Executor?, - threadPool: Executor?, - scheduledThreadPool: ScheduledExecutorService?, - protocolManager: ClientProtocolManager?) : - NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) { - companion object { - private val log = contextLogger() - } - - private val sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration) - - override fun createConnection(): Connection? { - val connection = super.createConnection() as? NettyConnection - if (sslEnabled && connection != null) { - val expectedLegalNames: Set = uncheckedCast(configuration[ArtemisTcpTransport.VERIFY_PEER_LEGAL_NAME] ?: emptySet()) - try { - val session = connection.channel - .pipeline() - .get(SslHandler::class.java) - .engine() - .session - // Checks the peer name is the one we are expecting. - // TODO Some problems here: after introduction of multiple legal identities on the node and removal of the main one, - // we run into the issue, who are we connecting to. There are some solutions to that: advertise `network identity`; - // have mapping port -> identity (but, design doc says about removing SingleMessageRecipient and having just NetworkHostAndPort, - // it was convenient to store that this way); SNI. - val peerLegalName = CordaX500Name.parse(session.peerPrincipal.name) - val expectedLegalName = expectedLegalNames.singleOrNull { it == peerLegalName } - require(expectedLegalName != null) { - "Peer has wrong CN - expected $expectedLegalNames but got $peerLegalName. This is either a fatal " + - "misconfiguration by the remote peer or an SSL man-in-the-middle attack!" - } - // Make sure certificate has the same name. - val peerCertificateName = CordaX500Name.build(X500Principal(session.peerCertificateChain[0].subjectDN.name)) - require(peerCertificateName == expectedLegalName) { - "Peer has wrong subject name in the certificate - expected $expectedLegalNames but got $peerCertificateName. This is either a fatal " + - "misconfiguration by the remote peer or an SSL man-in-the-middle attack!" - } - X509Utilities.validateCertificateChain(session.localCertificates.last() as java.security.cert.X509Certificate, *session.peerCertificates) - } catch (e: IllegalArgumentException) { - connection.close() - log.error(e.message) - return null - } - } - return connection - } -} - sealed class CertificateChainCheckPolicy { @FunctionalInterface diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/BridgeManager.kt b/node/src/main/kotlin/net/corda/node/services/messaging/BridgeManager.kt new file mode 100644 index 0000000000..5997921848 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/BridgeManager.kt @@ -0,0 +1,20 @@ +package net.corda.node.services.messaging + +import net.corda.core.identity.CordaX500Name +import net.corda.core.node.NodeInfo +import net.corda.core.utilities.NetworkHostAndPort + +/** + * Provides an internal interface that the [ArtemisMessagingServer] delegates to for Bridge activities. + */ +internal interface BridgeManager : AutoCloseable { + fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set) + + fun destroyBridges(node: NodeInfo) + + fun bridgeExists(bridgeName: String): Boolean + + fun start() + + fun stop() +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/CoreBridgeManager.kt b/node/src/main/kotlin/net/corda/node/services/messaging/CoreBridgeManager.kt new file mode 100644 index 0000000000..4161d89836 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/CoreBridgeManager.kt @@ -0,0 +1,166 @@ +package net.corda.node.services.messaging + +import io.netty.handler.ssl.SslHandler +import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.uncheckedCast +import net.corda.core.node.NodeInfo +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger +import net.corda.node.services.config.NodeConfiguration +import net.corda.nodeapi.ArtemisTcpTransport +import net.corda.nodeapi.ConnectionDirection +import net.corda.nodeapi.internal.ArtemisMessagingComponent +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER +import net.corda.nodeapi.internal.crypto.X509Utilities +import org.apache.activemq.artemis.core.config.BridgeConfiguration +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants +import org.apache.activemq.artemis.core.server.ActiveMQServer +import org.apache.activemq.artemis.spi.core.remoting.* +import org.apache.activemq.artemis.utils.ConfigurationHelper +import java.time.Duration +import java.util.concurrent.Executor +import java.util.concurrent.ScheduledExecutorService +import javax.security.auth.x500.X500Principal + +/** + * This class simply moves the legacy CORE bridge code from [ArtemisMessagingServer] + * into a class implementing [BridgeManager]. + * It has no lifecycle events, because the bridges are internal to the ActiveMQServer instance and thus + * stop when it is stopped. + */ +internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServer: ActiveMQServer) : BridgeManager { + companion object { + private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort" + + private val ArtemisMessagingComponent.ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort) + } + + private fun gatherAddresses(node: NodeInfo): Sequence { + val address = node.addresses.first() + return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence() + } + + + /** + * All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving + * messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it, + * as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's + * P2P address. + */ + override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set) { + val connectionDirection = ConnectionDirection.Outbound( + connectorFactoryClassName = VerifyingNettyConnectorFactory::class.java.name, + expectedCommonNames = legalNames + ) + val tcpTransport = ArtemisTcpTransport.tcpTransport(connectionDirection, target, config, enableSSL = true) + tcpTransport.params[ArtemisMessagingServer::class.java.name] = this + // We intentionally overwrite any previous connector config in case the peer legal name changed + activeMQServer.configuration.addConnectorConfiguration(target.toString(), tcpTransport) + + activeMQServer.deployBridge(BridgeConfiguration().apply { + name = getBridgeName(queueName, target) + this.queueName = queueName + forwardingAddress = P2P_QUEUE + staticConnectors = listOf(target.toString()) + confirmationWindowSize = 100000 // a guess + isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic + // We keep trying until the network map deems the node unreachable and tells us it's been removed at which + // point we destroy the bridge + retryInterval = config.activeMQServer.bridge.retryIntervalMs + retryIntervalMultiplier = config.activeMQServer.bridge.retryIntervalMultiplier + maxRetryInterval = Duration.ofMinutes(config.activeMQServer.bridge.maxRetryIntervalMin).toMillis() + // As a peer of the target node we must connect to it using the peer user. Actual authentication is done using + // our TLS certificate. + user = PEER_USER + password = PEER_USER + }) + } + + override fun bridgeExists(bridgeName: String): Boolean = activeMQServer.clusterManager.bridges.containsKey(bridgeName) + + override fun start() { + // Nothing to do + } + + override fun stop() { + // Nothing to do + } + + override fun close() = stop() + + override fun destroyBridges(node: NodeInfo) { + gatherAddresses(node).forEach { + activeMQServer.destroyBridge(it.bridgeName) + } + } +} + +class VerifyingNettyConnectorFactory : NettyConnectorFactory() { + override fun createConnector(configuration: MutableMap, + handler: BufferHandler?, + listener: ClientConnectionLifeCycleListener?, + closeExecutor: Executor?, + threadPool: Executor?, + scheduledThreadPool: ScheduledExecutorService?, + protocolManager: ClientProtocolManager?): Connector { + return VerifyingNettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, + protocolManager) + } + + private class VerifyingNettyConnector(configuration: MutableMap, + handler: BufferHandler?, + listener: ClientConnectionLifeCycleListener?, + closeExecutor: Executor?, + threadPool: Executor?, + scheduledThreadPool: ScheduledExecutorService?, + protocolManager: ClientProtocolManager?) : + NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) { + companion object { + private val log = contextLogger() + } + + private val sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration) + + override fun createConnection(): Connection? { + val connection = super.createConnection() as? NettyConnection + if (sslEnabled && connection != null) { + val expectedLegalNames: Set = uncheckedCast(configuration[ArtemisTcpTransport.VERIFY_PEER_LEGAL_NAME] ?: emptySet()) + try { + val session = connection.channel + .pipeline() + .get(SslHandler::class.java) + .engine() + .session + // Checks the peer name is the one we are expecting. + // TODO Some problems here: after introduction of multiple legal identities on the node and removal of the main one, + // we run into the issue, who are we connecting to. There are some solutions to that: advertise `network identity`; + // have mapping port -> identity (but, design doc says about removing SingleMessageRecipient and having just NetworkHostAndPort, + // it was convenient to store that this way); SNI. + val peerLegalName = CordaX500Name.parse(session.peerPrincipal.name) + val expectedLegalName = expectedLegalNames.singleOrNull { it == peerLegalName } + require(expectedLegalName != null) { + "Peer has wrong CN - expected $expectedLegalNames but got $peerLegalName. This is either a fatal " + + "misconfiguration by the remote peer or an SSL man-in-the-middle attack!" + } + // Make sure certificate has the same name. + val peerCertificateName = CordaX500Name.build(X500Principal(session.peerCertificateChain[0].subjectDN.name)) + require(peerCertificateName == expectedLegalName) { + "Peer has wrong subject name in the certificate - expected $expectedLegalNames but got $peerCertificateName. This is either a fatal " + + "misconfiguration by the remote peer or an SSL man-in-the-middle attack!" + } + X509Utilities.validateCertificateChain(session.localCertificates.last() as java.security.cert.X509Certificate, *session.peerCertificates) + } catch (e: IllegalArgumentException) { + connection.close() + log.error(e.message) + return null + } + } + return connection + } + } +} + diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index 0eb49880ea..fe29913bca 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -25,3 +25,4 @@ activeMQServer = { maxRetryIntervalMin = 3 } } +useAMQPBridges = true \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index e4136ffc24..def1012235 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -5,16 +5,16 @@ import com.nhaarman.mockito_kotlin.whenever import net.corda.core.context.AuthServiceId import net.corda.core.crypto.generateKeyPair import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.internal.configureDatabase import net.corda.node.internal.security.RPCSecurityManager import net.corda.node.internal.security.RPCSecurityManagerImpl +import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.network.NetworkMapCacheImpl import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor -import net.corda.node.internal.configureDatabase -import net.corda.node.services.config.CertChainPolicyConfig import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.testing.* @@ -72,6 +72,7 @@ class ArtemisMessagingTests { doReturn("").whenever(it).exportJMXto doReturn(emptyList()).whenever(it).certificateChainCheckPolicies doReturn(5).whenever(it).messageRedeliveryDelaySeconds + doReturn(true).whenever(it).useAMQPBridges } LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock()) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt index c3c510b0b7..f9c699b901 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -27,8 +27,8 @@ import net.corda.core.utilities.seconds import net.corda.node.internal.AbstractNode import net.corda.node.internal.StartedNode import net.corda.node.internal.cordapp.CordappLoader -import net.corda.node.services.api.SchemaService import net.corda.node.services.api.IdentityServiceInternal +import net.corda.node.services.api.SchemaService import net.corda.node.services.config.* import net.corda.node.services.keys.E2ETestKeyManagementService import net.corda.node.services.messaging.MessagingService @@ -37,14 +37,14 @@ import net.corda.node.services.transactions.BFTSMaRt import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor -import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.nodeapi.internal.ServiceIdentityGenerator -import net.corda.nodeapi.internal.NotaryInfo import net.corda.nodeapi.internal.NetworkParametersCopier +import net.corda.nodeapi.internal.NotaryInfo +import net.corda.nodeapi.internal.ServiceIdentityGenerator import net.corda.nodeapi.internal.config.User +import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig -import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.DUMMY_NOTARY_NAME +import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.internal.testThreadFactory import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties @@ -495,5 +495,6 @@ private fun mockNodeConfiguration(): NodeConfiguration { doReturn(5).whenever(it).messageRedeliveryDelaySeconds doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(null).whenever(it).devModeOptions + doReturn(true).whenever(it).useAMQPBridges } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 72a5bc8143..a5dff915ca 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -17,7 +17,6 @@ import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.RPCOps import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.security.RPCSecurityManagerImpl -import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.RPCServer import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.ArtemisTcpTransport @@ -47,13 +46,12 @@ import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy import org.apache.activemq.artemis.core.settings.impl.AddressSettings -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection +import org.apache.activemq.artemis.spi.core.remoting.Connection import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3 import java.lang.reflect.Method import java.nio.file.Path import java.nio.file.Paths import java.util.* -import javax.security.cert.X509Certificate inline fun RPCDriverDSL.startInVmRpcClient( username: String = rpcTestUser.username, @@ -135,11 +133,11 @@ fun rpcDriver( private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityManager3 { override fun validateUser(user: String?, password: String?) = isValid(user, password) override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet?, checkType: CheckType?) = isValid(user, password) - override fun validateUser(user: String?, password: String?, certificates: Array?): String? { + override fun validateUser(user: String?, password: String?, connection: Connection?): String? { return validate(user, password) } - override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet?, checkType: CheckType?, address: String?, connection: RemotingConnection?): String? { + override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet?, checkType: CheckType?, address: String?, connection: Connection?): String? { return validate(user, password) }