From 0b134eee86d4c38efa7b1c9b1bd8c5635937d474 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Thu, 8 Nov 2018 17:04:13 +0000 Subject: [PATCH] Improve artemis client shutdown time when disconnected and don't block on commit if Artemis is already closing. --- .../services/artemis/BridgeArtemisConnectionServiceImpl.kt | 2 ++ .../net/corda/nodeapi/internal/ArtemisMessagingClient.kt | 5 +++-- .../kotlin/net/corda/nodeapi/internal}/ClientSessionUtils.kt | 2 +- .../net/corda/node/services/messaging/MessagingExecutor.kt | 1 + .../net/corda/node/services/messaging/P2PMessagingClient.kt | 3 +++ 5 files changed, 10 insertions(+), 3 deletions(-) rename {node/src/main/kotlin/net/corda/node/services/messaging => node-api/src/main/kotlin/net/corda/nodeapi/internal}/ClientSessionUtils.kt (85%) diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt index c59b0049a1..4771c3f4c8 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt @@ -66,6 +66,8 @@ class BridgeArtemisConnectionServiceImpl(val conf: FirewallConfiguration, // would be the default and the two lines below can be deleted. connectionTTL = 60000 clientFailureCheckPeriod = 30000 + callFailoverTimeout = 1000 + callTimeout = 1000 minLargeMessageSize = maxMessageSize isUseGlobalPools = nodeSerializationEnv != null confirmationWindowSize = conf.p2pConfirmationWindowSize diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt index 0545eb14af..b51051a08e 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt @@ -8,7 +8,6 @@ import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpT import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransportFromList import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration -import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.* import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE @@ -51,6 +50,8 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, // would be the default and the two lines below can be deleted. connectionTTL = 60000 clientFailureCheckPeriod = 30000 + callFailoverTimeout = 1000 + callTimeout = 1000 minLargeMessageSize = maxMessageSize isUseGlobalPools = nodeSerializationEnv != null confirmationWindowSize = this@ArtemisMessagingClient.confirmationWindowSize @@ -81,7 +82,7 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, started?.run { producer.close() // Since we are leaking the session outside of this class it may well be already closed. - if(!session.isClosed) { + if (session.stillOpen()) { // Ensure any trailing messages are committed to the journal session.commit() } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ClientSessionUtils.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ClientSessionUtils.kt similarity index 85% rename from node/src/main/kotlin/net/corda/node/services/messaging/ClientSessionUtils.kt rename to node-api/src/main/kotlin/net/corda/nodeapi/internal/ClientSessionUtils.kt index 85c102d9bb..1b7ae2aea7 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ClientSessionUtils.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ClientSessionUtils.kt @@ -1,4 +1,4 @@ -package net.corda.node.services.messaging +package net.corda.nodeapi.internal import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index 6e4090893c..10d21717a1 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -13,6 +13,7 @@ import net.corda.node.VersionInfo import net.corda.node.services.statemachine.FlowMessagingImpl import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders +import net.corda.nodeapi.internal.stillOpen import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException import org.apache.activemq.artemis.api.core.Message.* import org.apache.activemq.artemis.api.core.SimpleString diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index e6ba01c1bb..0a52e4d77b 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -44,6 +44,7 @@ import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.requireMessageSize +import net.corda.nodeapi.internal.stillOpen import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID @@ -202,6 +203,8 @@ class P2PMessagingClient(val config: NodeConfiguration, // would be the default and the two lines below can be deleted. connectionTTL = 60000 clientFailureCheckPeriod = 30000 + callFailoverTimeout = 1000 + callTimeout = 1000 minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE isUseGlobalPools = nodeSerializationEnv != null confirmationWindowSize = config.enterpriseConfiguration.tuning.p2pConfirmationWindowSize