CORDA-1122 Tweak Artemis for performance (#496)

* ENT-1434 - tweak Artemis for P2P to auto commit and send asynchronously.

* ENT-1434 - tweak Artemis for P2P to auto commit and send asynchronously.

* Fix test compilation
This commit is contained in:
Christian Sailer 2018-03-05 10:47:45 +00:00 committed by GitHub
parent 7c459f3c99
commit e19f9a3841
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 117 additions and 81 deletions

View File

@ -17,12 +17,15 @@ import net.corda.testing.internal.rigorousMock
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.junit.Assert.assertArrayEquals
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.util.*
import kotlin.system.measureNanoTime
import kotlin.system.measureTimeMillis
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
@ -146,6 +149,72 @@ class AMQPBridgeTest {
artemisServer.stop()
}
@Test
@Ignore("Run only manually to check the throughput of the AMQP bridge")
fun `AMQP full bridge throughput`() {
val numMessages = 10000
// Create local queue
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
val (artemisServer, artemisClient, bridgeManager) = createArtemis(sourceQueueName)
val artemis = artemisClient.started!!
val queueName = ArtemisMessagingComponent.RemoteInboxAddress(BOB.publicKey).queueName
val (artemisRecServer, artemisRecClient) = createArtemisReceiver(amqpAddress, "artemisBridge")
//artemisBridgeClient.started!!.session.createQueue(SimpleString(queueName), RoutingType.ANYCAST, SimpleString(queueName), true)
var numReceived = 0
artemisRecClient.started!!.session.createQueue(SimpleString(queueName), RoutingType.ANYCAST, SimpleString(queueName), true)
val artemisConsumer = artemisRecClient.started!!.session.createConsumer(queueName)
val rubbishPayload = ByteArray(10 * 1024)
var timeNanosCreateMessage = 0L
var timeNanosSendMessage = 0L
var timeMillisRead = 0L
val simpleSourceQueueName = SimpleString(sourceQueueName)
val totalTimeMillis = measureTimeMillis {
repeat(numMessages) {
var artemisMessage: ClientMessage? = null
timeNanosCreateMessage += measureNanoTime {
artemisMessage = artemis.session.createMessage(true).apply {
putIntProperty("CountProp", it)
writeBodyBufferBytes(rubbishPayload)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
}
timeNanosSendMessage += measureNanoTime {
artemis.producer.send(simpleSourceQueueName, artemisMessage, {})
}
}
artemisClient.started!!.session.commit()
timeMillisRead = measureTimeMillis {
while (numReceived < numMessages) {
val current = artemisConsumer.receive()
val messageId = current.getIntProperty("CountProp")
assertEquals(numReceived, messageId)
++numReceived
current.acknowledge()
}
}
}
println("Creating $numMessages messages took ${timeNanosCreateMessage / (1000 * 1000)} milliseconds")
println("Sending $numMessages messages took ${timeNanosSendMessage / (1000 * 1000)} milliseconds")
println("Receiving $numMessages messages took $timeMillisRead milliseconds")
println("Total took $totalTimeMillis milliseconds")
assertEquals(numMessages, numReceived)
bridgeManager.stop()
artemisClient.stop()
artemisServer.stop()
artemisRecClient.stop()
artemisRecServer.stop()
}
private fun createArtemis(sourceQueueName: String?): Triple<ArtemisMessagingServer, ArtemisMessagingClient, BridgeManager> {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory
@ -159,7 +228,7 @@ class AMQPBridgeTest {
}
artemisConfig.configureWithDevSSLCertificate()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = 10 * 1024)
artemisServer.start()
artemisClient.start()
val bridgeManager = AMQPBridgeManager(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
@ -173,6 +242,28 @@ class AMQPBridgeTest {
return Triple(artemisServer, artemisClient, bridgeManager)
}
private fun createArtemisReceiver(targetAdress: NetworkHostAndPort, workingDir: String): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / workingDir).whenever(it).baseDirectory
doReturn(BOB_NAME).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn(targetAdress).whenever(it).p2pAddress
doReturn("").whenever(it).jmxMonitoringHttpPort
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
}
artemisConfig.configureWithDevSSLCertificate()
val artemisServer = ArtemisMessagingServer(artemisConfig, targetAdress.port, MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, targetAdress, MAX_MESSAGE_SIZE, confirmationWindowSize = 10 * 1024)
artemisServer.start()
artemisClient.start()
return Pair(artemisServer, artemisClient)
}
private fun createAMQPServer(): AMQPServer {
val serverConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "server").whenever(it).baseDirectory

View File

@ -99,26 +99,32 @@ class MessagingExecutor(
fun start() {
require(executor == null)
executor = thread(name = "Messaging executor", isDaemon = true) {
val batch = ArrayList<Job>()
eventLoop@ while (true) {
batch.add(queue.take()) // Block until at least one job is available.
queue.drainTo(batch)
sendBatchSizeMetric.update(batch.filter { it is Job.Send }.size)
val shouldShutdown = try {
// Try to handle the batch in one commit.
handleBatchTransactional(batch)
} catch (exception: ActiveMQException) {
// A job failed, rollback and do it one at a time, simply log and skip if an individual job fails.
// If a send job fails the exception will be re-raised in the corresponding future.
// Note that this fallback assumes that there are no two jobs in the batch that depend on one
// another. As the exception is re-raised in the requesting calling thread in case of a send, we can
// assume no "in-flight" messages will be sent out of order after failure.
log.warn("Exception while handling transactional batch, falling back to handling one job at a time", exception)
handleBatchOneByOne(batch)
}
batch.clear()
if (shouldShutdown) {
break@eventLoop
val job = queue.take() // Block until at least one job is available.
try {
when (job) {
is Job.Acknowledge -> {
acknowledgeJob(job)
}
is Job.Send -> {
try {
sendJob(job)
} catch (duplicateException: ActiveMQDuplicateIdException) {
log.warn("Message duplication", duplicateException)
job.sentFuture.set(Unit)
}
}
Job.Shutdown -> {
session.commit()
break@eventLoop
}
}
} catch (exception: Throwable) {
log.error("Exception while handling job $job, disregarding", exception)
if (job is Job.Send) {
job.sentFuture.setException(exception)
}
session.rollback()
}
}
}
@ -133,67 +139,6 @@ class MessagingExecutor(
}
}
/**
* Handles a batch of jobs in one transaction.
* @return true if the executor should shut down, false otherwise.
* @throws ActiveMQException
*/
private fun handleBatchTransactional(batch: List<Job>): Boolean {
for (job in batch) {
when (job) {
is Job.Acknowledge -> {
acknowledgeJob(job)
}
is Job.Send -> {
sendJob(job)
}
Job.Shutdown -> {
session.commit()
return true
}
}
}
session.commit()
return false
}
/**
* Handles a batch of jobs one by one, committing after each.
* @return true if the executor should shut down, false otherwise.
*/
private fun handleBatchOneByOne(batch: List<Job>): Boolean {
for (job in batch) {
try {
when (job) {
is Job.Acknowledge -> {
acknowledgeJob(job)
session.commit()
}
is Job.Send -> {
try {
sendJob(job)
session.commit()
} catch (duplicateException: ActiveMQDuplicateIdException) {
log.warn("Message duplication", duplicateException)
job.sentFuture.set(Unit)
}
}
Job.Shutdown -> {
session.commit()
return true
}
}
} catch (exception: Throwable) {
log.error("Exception while handling job $job, disregarding", exception)
if (job is Job.Send) {
job.sentFuture.setException(exception)
}
session.rollback()
}
}
return false
}
private fun sendJob(job: Job.Send) {
val mqAddress = resolver.resolveTargetToArtemisQueue(job.target)
val artemisMessage = cordaToArtemisMessage(job.message)