mirror of
https://github.com/corda/corda.git
synced 2025-03-14 00:06:45 +00:00
System.exit node if artemis is closed (#898)
This commit is contained in:
parent
741a5741bd
commit
675059bfa8
@ -20,6 +20,7 @@ import net.corda.node.VersionInfo
|
||||
import net.corda.node.services.statemachine.FlowMessagingImpl
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
@ -128,6 +129,12 @@ class MessagingExecutor(
|
||||
break@eventLoop
|
||||
}
|
||||
}
|
||||
} catch (exception: ActiveMQObjectClosedException) {
|
||||
log.error("Messaging client connection closed", exception)
|
||||
if (job is Job.Send) {
|
||||
job.sentFuture.setException(exception)
|
||||
}
|
||||
System.exit(1)
|
||||
} catch (exception: Throwable) {
|
||||
log.error("Exception while handling job $job, disregarding", exception)
|
||||
if (job is Job.Send) {
|
||||
|
@ -0,0 +1,61 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.codahale.metrics.Timer
|
||||
import com.nhaarman.mockito_kotlin.any
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.services.statemachine.DeduplicationId
|
||||
import net.corda.testing.node.internal.InMemoryMessage
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.junit.After
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.contrib.java.lang.system.ExpectedSystemExit
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
class MessagingExecutorTest {
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val exit: ExpectedSystemExit = ExpectedSystemExit.none()
|
||||
|
||||
private lateinit var messagingExecutor: MessagingExecutor
|
||||
|
||||
@After
|
||||
fun after() {
|
||||
messagingExecutor.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `System exit node if messaging is closed`() {
|
||||
exit.expectSystemExitWithStatus(1)
|
||||
|
||||
val session = mock<ClientSession>()
|
||||
whenever(session.createMessage(any())).thenReturn(mock())
|
||||
|
||||
val producer = mock<ClientProducer>()
|
||||
whenever(producer.send(any(), any(), any())).thenThrow(ActiveMQObjectClosedException())
|
||||
|
||||
val resolver = mock<AddressToArtemisQueueResolver>()
|
||||
whenever(resolver.resolveTargetToArtemisQueue(any())).thenReturn("address")
|
||||
|
||||
val metricRegistry = mock<MetricRegistry>()
|
||||
val sendLatencyMetric = mock<Timer>()
|
||||
whenever(metricRegistry.timer(any())).thenReturn(sendLatencyMetric)
|
||||
whenever(sendLatencyMetric.time()).thenReturn(mock())
|
||||
whenever(metricRegistry.histogram(any())).thenReturn(mock())
|
||||
|
||||
messagingExecutor = MessagingExecutor(session, producer, VersionInfo.UNKNOWN, resolver, metricRegistry, "ourSenderUUID", 10, "legalName")
|
||||
messagingExecutor.start()
|
||||
|
||||
thread {
|
||||
messagingExecutor.send(InMemoryMessage("topic", OpaqueBytes(ByteArray(10)), DeduplicationId("1")), mock())
|
||||
}.join()
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user