diff --git a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt index e990d0dd9d..43238f2d9e 100644 --- a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt +++ b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt @@ -170,7 +170,7 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri val smm = makeStateMachineManager() private fun makeStateMachineManager(): StateMachineManager { - val executor = MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize) + val executor = MultiThreadedStateMachineExecutor(metricRegistry, configuration.enterpriseConfiguration.tuning.flowThreadPoolSize) runOnStop += { executor.shutdown() } return MultiThreadedStateMachineManager( this, diff --git a/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt b/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt index 1b40d7500f..4eeb2db805 100644 --- a/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt @@ -173,7 +173,7 @@ D""".trimStart() private fun makeStateMachineExecutorService(): ExecutorService { log.info("Multi-threaded state machine manager with ${configuration.enterpriseConfiguration.tuning.flowThreadPoolSize} threads.") - return MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize) + return MultiThreadedStateMachineExecutor(metricRegistry, configuration.enterpriseConfiguration.tuning.flowThreadPoolSize) } override fun makeStateMachineManager(): StateMachineManager { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index 5d9e7aa487..68b99e12b7 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -2,7 +2,9 @@ package net.corda.node.services.messaging import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.SettableFuture +import com.codahale.metrics.Gauge import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.Timer import net.corda.core.messaging.MessageRecipients import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug @@ -49,7 +51,8 @@ class MessagingExecutor( data class Send( val message: Message, val target: MessageRecipients, - val sentFuture: SettableFuture + val sentFuture: SettableFuture, + val timer: Timer.Context ) : Job() { override fun toString() = "Send(${message.uniqueMessageId}, target=$target)" } @@ -60,8 +63,16 @@ class MessagingExecutor( private var executor: Thread? = null private val cordaVendor = SimpleString(versionInfo.vendor) private val releaseVersion = SimpleString(versionInfo.releaseVersion) - private val sendMessageSizeMetric = metricRegistry.histogram("SendMessageSize") - private val sendLatencyMetric = metricRegistry.timer("SendLatency") + private val sendMessageSizeMetric = metricRegistry.histogram("P2P.SendMessageSize") + private val sendLatencyMetric = metricRegistry.timer("P2P.SendLatency") + private val sendQueueSizeOnInsert = metricRegistry.histogram("P2P.SendQueueSizeOnInsert") + + init { + metricRegistry.register("P2P.SendQueueSize", Gauge { + queue.size + }) + } + private val ourSenderSeqNo = AtomicLong() private companion object { @@ -76,15 +87,13 @@ class MessagingExecutor( @Suspendable fun send(message: Message, target: MessageRecipients) { val sentFuture = SettableFuture() - val job = Job.Send(message, target, sentFuture) - val context = sendLatencyMetric.time() + val job = Job.Send(message, target, sentFuture, sendLatencyMetric.time()) try { + sendQueueSizeOnInsert.update(queue.size) queue.put(job) sentFuture.get() } catch (executionException: ExecutionException) { throw executionException.cause!! - } finally { - context.stop() } } @@ -112,7 +121,7 @@ class MessagingExecutor( sendJob(job) } catch (duplicateException: ActiveMQDuplicateIdException) { log.warn("Message duplication", duplicateException) - job.sentFuture.set(Unit) + job.sentFuture.set(Unit) // Intentionally don't stop the timer. } } Job.Shutdown -> { @@ -148,7 +157,10 @@ class MessagingExecutor( "Send to: $mqAddress topic: ${job.message.topic} " + "sessionID: ${job.message.topic} id: ${job.message.uniqueMessageId}" } - producer.send(SimpleString(mqAddress), artemisMessage, { job.sentFuture.set(Unit) }) + producer.send(SimpleString(mqAddress), artemisMessage, { + job.timer.stop() + job.sentFuture.set(Unit) + }) } fun cordaToArtemisMessage(message: Message, target: MessageRecipients? = null): ClientMessage? { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index a7c106d0f9..e34996c9aa 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -1,6 +1,7 @@ package net.corda.node.services.messaging import co.paralleluniverse.fibers.Suspendable +import com.codahale.metrics.Clock import com.codahale.metrics.MetricRegistry import net.corda.core.crypto.toStringShort import net.corda.core.identity.CordaX500Name @@ -56,6 +57,7 @@ import java.time.Instant import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import javax.annotation.concurrent.ThreadSafe /** @@ -414,15 +416,30 @@ class P2PMessagingClient(val config: NodeConfiguration, override fun toString() = "$topic#$data" } + private val receiverDurationTimer = metricRegistry.timer("P2P.ReceiveDuration") + private val receiverIntervalTimer = metricRegistry.timer("P2P.ReceiveInterval") + private val receiveTimerClock = Clock.defaultClock() + private var lastReceiveTimerTick = receiveTimerClock.tick + private val receiveMessageSizeMetric = metricRegistry.histogram("P2P.ReceiveMessageSize") + fun deliver(artemisMessage: ClientMessage) { - artemisToCordaMessage(artemisMessage)?.let { cordaMessage -> - if (!deduplicator.isDuplicate(cordaMessage)) { - deduplicator.signalMessageProcessStart(cordaMessage) - deliver(cordaMessage, artemisMessage) - } else { - log.trace { "Discard duplicate message ${cordaMessage.uniqueMessageId} for ${cordaMessage.topic}" } - messagingExecutor!!.acknowledge(artemisMessage) + val elapsed = receiveTimerClock.tick - lastReceiveTimerTick + receiverIntervalTimer.update(elapsed, TimeUnit.NANOSECONDS) + receiveMessageSizeMetric.update(artemisMessage.encodeSize) + val latency = receiverDurationTimer.time() + try { + artemisToCordaMessage(artemisMessage)?.let { cordaMessage -> + if (!deduplicator.isDuplicate(cordaMessage)) { + deduplicator.signalMessageProcessStart(cordaMessage) + deliver(cordaMessage, artemisMessage) + } else { + log.trace { "Discard duplicate message ${cordaMessage.uniqueMessageId} for ${cordaMessage.topic}" } + messagingExecutor!!.acknowledge(artemisMessage) + } } + } finally { + latency.stop() + lastReceiveTimerTick = receiveTimerClock.tick } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineExecutor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineExecutor.kt index 90f4b30d22..02da1a0668 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineExecutor.kt @@ -1,12 +1,28 @@ package net.corda.node.services.statemachine +import com.codahale.metrics.Gauge +import com.codahale.metrics.MetricRegistry import com.google.common.util.concurrent.ThreadFactoryBuilder import io.netty.util.concurrent.FastThreadLocalThread import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit -class MultiThreadedStateMachineExecutor(poolSize: Int) : ThreadPoolExecutor(poolSize, poolSize, +class MultiThreadedStateMachineExecutor(metricRegistry: MetricRegistry, + poolSize: Int) : ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, PriorityBlockingQueue(poolSize * 4, FlowStateMachineComparator()), - ThreadFactoryBuilder().setNameFormat("flow-worker").setThreadFactory(::FastThreadLocalThread).build()) \ No newline at end of file + ThreadFactoryBuilder().setNameFormat("flow-worker").setThreadFactory(::FastThreadLocalThread).build()) { + + private val stateMachineQueueSizeOnInsert = metricRegistry.histogram("Flows.QueueSizeOnInsert") + + init { + metricRegistry.register("Flows.QueueSize", Gauge { + queue.size + }) + } + override fun execute(command: Runnable?) { + stateMachineQueueSizeOnInsert.update(queue.size) + super.execute(command) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index 35ce810d8a..95ec986c19 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -758,9 +758,9 @@ class MultiThreadedStateMachineManager( private fun makeTransitionExecutor(): TransitionExecutor { val interceptors = ArrayList() interceptors.add { HospitalisingInterceptor(flowHospital, it) } + interceptors.add { MetricInterceptor(metrics, it) } if (serviceHub.configuration.devMode) { interceptors.add { DumpHistoryOnErrorInterceptor(it) } - interceptors.add { MetricInterceptor(metrics, it) } } if (serviceHub.configuration.shouldCheckCheckpoints()) { interceptors.add { FiberDeserializationCheckingInterceptor(fiberDeserializationChecker!!, it) } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparatorTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparatorTest.kt index dc5e81c8e2..389c4124b7 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparatorTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparatorTest.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.FiberExecutorScheduler import co.paralleluniverse.fibers.FiberScheduler import co.paralleluniverse.fibers.Suspendable +import com.codahale.metrics.MetricRegistry import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.flows.FlowLogic @@ -145,7 +146,7 @@ class FlowStateMachineComparatorTest { @Test fun `test executor`() { - val executor = MultiThreadedStateMachineExecutor(1) + val executor = MultiThreadedStateMachineExecutor(MetricRegistry(), 1) val scheduler = FiberExecutorScheduler("TestScheduler", executor) val clock = TestClock(Clock.systemUTC()) val blockerLogic = BlockerFlow()