diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 0baa7e8897..72b256eefc 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -17,12 +17,15 @@ import net.corda.testing.internal.rigorousMock import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.ClientMessage import org.junit.Assert.assertArrayEquals import org.junit.Ignore import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder import java.util.* +import kotlin.system.measureNanoTime +import kotlin.system.measureTimeMillis import kotlin.test.assertEquals import kotlin.test.assertNotEquals @@ -146,6 +149,72 @@ class AMQPBridgeTest { artemisServer.stop() } + @Test + @Ignore("Run only manually to check the throughput of the AMQP bridge") + fun `AMQP full bridge throughput`() { + val numMessages = 10000 + // Create local queue + val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort() + val (artemisServer, artemisClient, bridgeManager) = createArtemis(sourceQueueName) + + val artemis = artemisClient.started!! + val queueName = ArtemisMessagingComponent.RemoteInboxAddress(BOB.publicKey).queueName + + val (artemisRecServer, artemisRecClient) = createArtemisReceiver(amqpAddress, "artemisBridge") + //artemisBridgeClient.started!!.session.createQueue(SimpleString(queueName), RoutingType.ANYCAST, SimpleString(queueName), true) + + var numReceived = 0 + + artemisRecClient.started!!.session.createQueue(SimpleString(queueName), RoutingType.ANYCAST, SimpleString(queueName), true) + val artemisConsumer = artemisRecClient.started!!.session.createConsumer(queueName) + + val rubbishPayload = ByteArray(10 * 1024) + var timeNanosCreateMessage = 0L + var timeNanosSendMessage = 0L + var timeMillisRead = 0L + val simpleSourceQueueName = SimpleString(sourceQueueName) + val totalTimeMillis = measureTimeMillis { + repeat(numMessages) { + var artemisMessage: ClientMessage? = null + timeNanosCreateMessage += measureNanoTime { + artemisMessage = artemis.session.createMessage(true).apply { + putIntProperty("CountProp", it) + writeBodyBufferBytes(rubbishPayload) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + } + } + timeNanosSendMessage += measureNanoTime { + artemis.producer.send(simpleSourceQueueName, artemisMessage, {}) + } + } + artemisClient.started!!.session.commit() + + + timeMillisRead = measureTimeMillis { + while (numReceived < numMessages) { + val current = artemisConsumer.receive() + val messageId = current.getIntProperty("CountProp") + assertEquals(numReceived, messageId) + ++numReceived + current.acknowledge() + } + } + } + println("Creating $numMessages messages took ${timeNanosCreateMessage / (1000 * 1000)} milliseconds") + println("Sending $numMessages messages took ${timeNanosSendMessage / (1000 * 1000)} milliseconds") + println("Receiving $numMessages messages took $timeMillisRead milliseconds") + println("Total took $totalTimeMillis milliseconds") + assertEquals(numMessages, numReceived) + + bridgeManager.stop() + artemisClient.stop() + artemisServer.stop() + artemisRecClient.stop() + artemisRecServer.stop() + } + + private fun createArtemis(sourceQueueName: String?): Triple { val artemisConfig = rigorousMock().also { doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory @@ -159,7 +228,7 @@ class AMQPBridgeTest { } artemisConfig.configureWithDevSSLCertificate() val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE) - val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE) + val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = 10 * 1024) artemisServer.start() artemisClient.start() val bridgeManager = AMQPBridgeManager(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE) @@ -173,6 +242,28 @@ class AMQPBridgeTest { return Triple(artemisServer, artemisClient, bridgeManager) } + private fun createArtemisReceiver(targetAdress: NetworkHostAndPort, workingDir: String): Pair { + val artemisConfig = rigorousMock().also { + doReturn(temporaryFolder.root.toPath() / workingDir).whenever(it).baseDirectory + doReturn(BOB_NAME).whenever(it).myLegalName + doReturn("trustpass").whenever(it).trustStorePassword + doReturn("cordacadevpass").whenever(it).keyStorePassword + doReturn(targetAdress).whenever(it).p2pAddress + doReturn("").whenever(it).jmxMonitoringHttpPort + doReturn(emptyList()).whenever(it).certificateChainCheckPolicies + doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration + } + artemisConfig.configureWithDevSSLCertificate() + val artemisServer = ArtemisMessagingServer(artemisConfig, targetAdress.port, MAX_MESSAGE_SIZE) + val artemisClient = ArtemisMessagingClient(artemisConfig, targetAdress, MAX_MESSAGE_SIZE, confirmationWindowSize = 10 * 1024) + artemisServer.start() + artemisClient.start() + + return Pair(artemisServer, artemisClient) + + } + + private fun createAMQPServer(): AMQPServer { val serverConfig = rigorousMock().also { doReturn(temporaryFolder.root.toPath() / "server").whenever(it).baseDirectory diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index 901f39dc26..084877ad82 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -99,26 +99,32 @@ class MessagingExecutor( fun start() { require(executor == null) executor = thread(name = "Messaging executor", isDaemon = true) { - val batch = ArrayList() eventLoop@ while (true) { - batch.add(queue.take()) // Block until at least one job is available. - queue.drainTo(batch) - sendBatchSizeMetric.update(batch.filter { it is Job.Send }.size) - val shouldShutdown = try { - // Try to handle the batch in one commit. - handleBatchTransactional(batch) - } catch (exception: ActiveMQException) { - // A job failed, rollback and do it one at a time, simply log and skip if an individual job fails. - // If a send job fails the exception will be re-raised in the corresponding future. - // Note that this fallback assumes that there are no two jobs in the batch that depend on one - // another. As the exception is re-raised in the requesting calling thread in case of a send, we can - // assume no "in-flight" messages will be sent out of order after failure. - log.warn("Exception while handling transactional batch, falling back to handling one job at a time", exception) - handleBatchOneByOne(batch) - } - batch.clear() - if (shouldShutdown) { - break@eventLoop + val job = queue.take() // Block until at least one job is available. + try { + when (job) { + is Job.Acknowledge -> { + acknowledgeJob(job) + } + is Job.Send -> { + try { + sendJob(job) + } catch (duplicateException: ActiveMQDuplicateIdException) { + log.warn("Message duplication", duplicateException) + job.sentFuture.set(Unit) + } + } + Job.Shutdown -> { + session.commit() + break@eventLoop + } + } + } catch (exception: Throwable) { + log.error("Exception while handling job $job, disregarding", exception) + if (job is Job.Send) { + job.sentFuture.setException(exception) + } + session.rollback() } } } @@ -133,67 +139,6 @@ class MessagingExecutor( } } - /** - * Handles a batch of jobs in one transaction. - * @return true if the executor should shut down, false otherwise. - * @throws ActiveMQException - */ - private fun handleBatchTransactional(batch: List): Boolean { - for (job in batch) { - when (job) { - is Job.Acknowledge -> { - acknowledgeJob(job) - } - is Job.Send -> { - sendJob(job) - } - Job.Shutdown -> { - session.commit() - return true - } - } - } - session.commit() - return false - } - - /** - * Handles a batch of jobs one by one, committing after each. - * @return true if the executor should shut down, false otherwise. - */ - private fun handleBatchOneByOne(batch: List): Boolean { - for (job in batch) { - try { - when (job) { - is Job.Acknowledge -> { - acknowledgeJob(job) - session.commit() - } - is Job.Send -> { - try { - sendJob(job) - session.commit() - } catch (duplicateException: ActiveMQDuplicateIdException) { - log.warn("Message duplication", duplicateException) - job.sentFuture.set(Unit) - } - } - Job.Shutdown -> { - session.commit() - return true - } - } - } catch (exception: Throwable) { - log.error("Exception while handling job $job, disregarding", exception) - if (job is Job.Send) { - job.sentFuture.setException(exception) - } - session.rollback() - } - } - return false - } - private fun sendJob(job: Job.Send) { val mqAddress = resolver.resolveTargetToArtemisQueue(job.target) val artemisMessage = cordaToArtemisMessage(job.message)