mirror of
https://github.com/corda/corda.git
synced 2025-04-19 08:36:39 +00:00
ENT-2513 Add some P2P consumer and producer related metrics (#1419)
This commit is contained in:
parent
802c88e7ad
commit
39878e1966
@ -170,7 +170,7 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
|
||||
val smm = makeStateMachineManager()
|
||||
|
||||
private fun makeStateMachineManager(): StateMachineManager {
|
||||
val executor = MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize)
|
||||
val executor = MultiThreadedStateMachineExecutor(metricRegistry, configuration.enterpriseConfiguration.tuning.flowThreadPoolSize)
|
||||
runOnStop += { executor.shutdown() }
|
||||
return MultiThreadedStateMachineManager(
|
||||
this,
|
||||
|
@ -173,7 +173,7 @@ D""".trimStart()
|
||||
|
||||
private fun makeStateMachineExecutorService(): ExecutorService {
|
||||
log.info("Multi-threaded state machine manager with ${configuration.enterpriseConfiguration.tuning.flowThreadPoolSize} threads.")
|
||||
return MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize)
|
||||
return MultiThreadedStateMachineExecutor(metricRegistry, configuration.enterpriseConfiguration.tuning.flowThreadPoolSize)
|
||||
}
|
||||
|
||||
override fun makeStateMachineManager(): StateMachineManager {
|
||||
|
@ -2,7 +2,9 @@ package net.corda.node.services.messaging
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.strands.SettableFuture
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.codahale.metrics.Timer
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
@ -49,7 +51,8 @@ class MessagingExecutor(
|
||||
data class Send(
|
||||
val message: Message,
|
||||
val target: MessageRecipients,
|
||||
val sentFuture: SettableFuture<Unit>
|
||||
val sentFuture: SettableFuture<Unit>,
|
||||
val timer: Timer.Context
|
||||
) : Job() {
|
||||
override fun toString() = "Send(${message.uniqueMessageId}, target=$target)"
|
||||
}
|
||||
@ -60,8 +63,16 @@ class MessagingExecutor(
|
||||
private var executor: Thread? = null
|
||||
private val cordaVendor = SimpleString(versionInfo.vendor)
|
||||
private val releaseVersion = SimpleString(versionInfo.releaseVersion)
|
||||
private val sendMessageSizeMetric = metricRegistry.histogram("SendMessageSize")
|
||||
private val sendLatencyMetric = metricRegistry.timer("SendLatency")
|
||||
private val sendMessageSizeMetric = metricRegistry.histogram("P2P.SendMessageSize")
|
||||
private val sendLatencyMetric = metricRegistry.timer("P2P.SendLatency")
|
||||
private val sendQueueSizeOnInsert = metricRegistry.histogram("P2P.SendQueueSizeOnInsert")
|
||||
|
||||
init {
|
||||
metricRegistry.register("P2P.SendQueueSize", Gauge<Int> {
|
||||
queue.size
|
||||
})
|
||||
}
|
||||
|
||||
private val ourSenderSeqNo = AtomicLong()
|
||||
|
||||
private companion object {
|
||||
@ -76,15 +87,13 @@ class MessagingExecutor(
|
||||
@Suspendable
|
||||
fun send(message: Message, target: MessageRecipients) {
|
||||
val sentFuture = SettableFuture<Unit>()
|
||||
val job = Job.Send(message, target, sentFuture)
|
||||
val context = sendLatencyMetric.time()
|
||||
val job = Job.Send(message, target, sentFuture, sendLatencyMetric.time())
|
||||
try {
|
||||
sendQueueSizeOnInsert.update(queue.size)
|
||||
queue.put(job)
|
||||
sentFuture.get()
|
||||
} catch (executionException: ExecutionException) {
|
||||
throw executionException.cause!!
|
||||
} finally {
|
||||
context.stop()
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,7 +121,7 @@ class MessagingExecutor(
|
||||
sendJob(job)
|
||||
} catch (duplicateException: ActiveMQDuplicateIdException) {
|
||||
log.warn("Message duplication", duplicateException)
|
||||
job.sentFuture.set(Unit)
|
||||
job.sentFuture.set(Unit) // Intentionally don't stop the timer.
|
||||
}
|
||||
}
|
||||
Job.Shutdown -> {
|
||||
@ -148,7 +157,10 @@ class MessagingExecutor(
|
||||
"Send to: $mqAddress topic: ${job.message.topic} " +
|
||||
"sessionID: ${job.message.topic} id: ${job.message.uniqueMessageId}"
|
||||
}
|
||||
producer.send(SimpleString(mqAddress), artemisMessage, { job.sentFuture.set(Unit) })
|
||||
producer.send(SimpleString(mqAddress), artemisMessage, {
|
||||
job.timer.stop()
|
||||
job.sentFuture.set(Unit)
|
||||
})
|
||||
}
|
||||
|
||||
fun cordaToArtemisMessage(message: Message, target: MessageRecipients? = null): ClientMessage? {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.codahale.metrics.Clock
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
@ -56,6 +57,7 @@ import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
@ -414,15 +416,30 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
override fun toString() = "$topic#$data"
|
||||
}
|
||||
|
||||
private val receiverDurationTimer = metricRegistry.timer("P2P.ReceiveDuration")
|
||||
private val receiverIntervalTimer = metricRegistry.timer("P2P.ReceiveInterval")
|
||||
private val receiveTimerClock = Clock.defaultClock()
|
||||
private var lastReceiveTimerTick = receiveTimerClock.tick
|
||||
private val receiveMessageSizeMetric = metricRegistry.histogram("P2P.ReceiveMessageSize")
|
||||
|
||||
fun deliver(artemisMessage: ClientMessage) {
|
||||
artemisToCordaMessage(artemisMessage)?.let { cordaMessage ->
|
||||
if (!deduplicator.isDuplicate(cordaMessage)) {
|
||||
deduplicator.signalMessageProcessStart(cordaMessage)
|
||||
deliver(cordaMessage, artemisMessage)
|
||||
} else {
|
||||
log.trace { "Discard duplicate message ${cordaMessage.uniqueMessageId} for ${cordaMessage.topic}" }
|
||||
messagingExecutor!!.acknowledge(artemisMessage)
|
||||
val elapsed = receiveTimerClock.tick - lastReceiveTimerTick
|
||||
receiverIntervalTimer.update(elapsed, TimeUnit.NANOSECONDS)
|
||||
receiveMessageSizeMetric.update(artemisMessage.encodeSize)
|
||||
val latency = receiverDurationTimer.time()
|
||||
try {
|
||||
artemisToCordaMessage(artemisMessage)?.let { cordaMessage ->
|
||||
if (!deduplicator.isDuplicate(cordaMessage)) {
|
||||
deduplicator.signalMessageProcessStart(cordaMessage)
|
||||
deliver(cordaMessage, artemisMessage)
|
||||
} else {
|
||||
log.trace { "Discard duplicate message ${cordaMessage.uniqueMessageId} for ${cordaMessage.topic}" }
|
||||
messagingExecutor!!.acknowledge(artemisMessage)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
latency.stop()
|
||||
lastReceiveTimerTick = receiveTimerClock.tick
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,12 +1,28 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
import io.netty.util.concurrent.FastThreadLocalThread
|
||||
import java.util.concurrent.PriorityBlockingQueue
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class MultiThreadedStateMachineExecutor(poolSize: Int) : ThreadPoolExecutor(poolSize, poolSize,
|
||||
class MultiThreadedStateMachineExecutor(metricRegistry: MetricRegistry,
|
||||
poolSize: Int) : ThreadPoolExecutor(poolSize, poolSize,
|
||||
0L, TimeUnit.MILLISECONDS,
|
||||
PriorityBlockingQueue<Runnable>(poolSize * 4, FlowStateMachineComparator()),
|
||||
ThreadFactoryBuilder().setNameFormat("flow-worker").setThreadFactory(::FastThreadLocalThread).build())
|
||||
ThreadFactoryBuilder().setNameFormat("flow-worker").setThreadFactory(::FastThreadLocalThread).build()) {
|
||||
|
||||
private val stateMachineQueueSizeOnInsert = metricRegistry.histogram("Flows.QueueSizeOnInsert")
|
||||
|
||||
init {
|
||||
metricRegistry.register("Flows.QueueSize", Gauge<Int> {
|
||||
queue.size
|
||||
})
|
||||
}
|
||||
override fun execute(command: Runnable?) {
|
||||
stateMachineQueueSizeOnInsert.update(queue.size)
|
||||
super.execute(command)
|
||||
}
|
||||
}
|
@ -758,9 +758,9 @@ class MultiThreadedStateMachineManager(
|
||||
private fun makeTransitionExecutor(): TransitionExecutor {
|
||||
val interceptors = ArrayList<TransitionInterceptor>()
|
||||
interceptors.add { HospitalisingInterceptor(flowHospital, it) }
|
||||
interceptors.add { MetricInterceptor(metrics, it) }
|
||||
if (serviceHub.configuration.devMode) {
|
||||
interceptors.add { DumpHistoryOnErrorInterceptor(it) }
|
||||
interceptors.add { MetricInterceptor(metrics, it) }
|
||||
}
|
||||
if (serviceHub.configuration.shouldCheckCheckpoints()) {
|
||||
interceptors.add { FiberDeserializationCheckingInterceptor(fiberDeserializationChecker!!, it) }
|
||||
|
@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Fiber
|
||||
import co.paralleluniverse.fibers.FiberExecutorScheduler
|
||||
import co.paralleluniverse.fibers.FiberScheduler
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.flows.FlowLogic
|
||||
@ -145,7 +146,7 @@ class FlowStateMachineComparatorTest {
|
||||
|
||||
@Test
|
||||
fun `test executor`() {
|
||||
val executor = MultiThreadedStateMachineExecutor(1)
|
||||
val executor = MultiThreadedStateMachineExecutor(MetricRegistry(), 1)
|
||||
val scheduler = FiberExecutorScheduler("TestScheduler", executor)
|
||||
val clock = TestClock(Clock.systemUTC())
|
||||
val blockerLogic = BlockerFlow()
|
||||
|
Loading…
x
Reference in New Issue
Block a user