From 4bf5d809a5d11f2933ffdae0430c3e772a7c8642 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 13 Jun 2018 10:32:29 +0100 Subject: [PATCH] If the Artemis connectionTTL configuration is not set then some of the cleanup actions do not happen on client kill. This prevents durable messages being replayed (#3351) and may prevent cleanup of other resources. Undo spurious code --- .../internal/ArtemisMessagingClient.kt | 4 ++-- .../messaging/InternalRPCMessagingClient.kt | 2 +- .../services/messaging/P2PMessagingClient.kt | 22 +++++-------------- 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt index 544f19ef30..cd0fdf555d 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt @@ -38,7 +38,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration, val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this // would be the default and the two lines below can be deleted. - connectionTTL = -1 + connectionTTL = 60000 clientFailureCheckPeriod = -1 minLargeMessageSize = maxMessageSize isUseGlobalPools = nodeSerializationEnv != null @@ -49,7 +49,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration, // using our TLS certificate. // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer // size of 1MB is acknowledged. - val session = sessionFactory!!.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE) + val session = sessionFactory!!.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) session.start() // Create a general purpose producer. val producer = session.createProducer() diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt index dee6037413..b8d4001a5d 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt @@ -26,7 +26,7 @@ class InternalRPCMessagingClient(val sslConfig: SSLConfiguration, val serverAddr locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this // would be the default and the two lines below can be deleted. - connectionTTL = -1 + connectionTTL = 60000 clientFailureCheckPeriod = -1 minLargeMessageSize = maxMessageSize isUseGlobalPools = nodeSerializationEnv != null diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index d10ab3ef7f..435eca5ad0 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -15,11 +15,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize -import net.corda.core.utilities.ByteSequence -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.OpaqueBytes -import net.corda.core.utilities.contextLogger -import net.corda.core.utilities.trace +import net.corda.core.utilities.* import net.corda.node.VersionInfo import net.corda.node.internal.LifecycleSupport import net.corda.node.internal.artemis.ReactiveArtemisConsumer @@ -32,15 +28,12 @@ import net.corda.node.services.statemachine.SenderDeduplicationId import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.* import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress -import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress -import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -50,12 +43,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.ActiveMQClient -import org.apache.activemq.artemis.api.core.client.ClientConsumer -import org.apache.activemq.artemis.api.core.client.ClientMessage -import org.apache.activemq.artemis.api.core.client.ClientProducer -import org.apache.activemq.artemis.api.core.client.ClientSession -import org.apache.activemq.artemis.api.core.client.ServerLocator +import org.apache.activemq.artemis.api.core.client.* import rx.Observable import rx.Subscription import rx.subjects.PublishSubject @@ -153,7 +141,7 @@ class P2PMessagingClient(val config: NodeConfiguration, locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this // would be the default and the two lines below can be deleted. - connectionTTL = -1 + connectionTTL = 60000 clientFailureCheckPeriod = -1 minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE isUseGlobalPools = nodeSerializationEnv != null @@ -163,7 +151,7 @@ class P2PMessagingClient(val config: NodeConfiguration, // using our TLS certificate. // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer // size of 1MB is acknowledged. - val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, locator!!.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) } + val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) } producerSession = createNewSession() bridgeSession = createNewSession()