mirror of
https://github.com/corda/corda.git
synced 2025-01-30 16:14:39 +00:00
If the Artemis connectionTTL configuration is not set then some of the cleanup actions do not happen on client kill. This prevents durable messages being replayed (#3351)
and may prevent cleanup of other resources. Undo spurious code
This commit is contained in:
parent
e2701e69d6
commit
4bf5d809a5
@ -38,7 +38,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration,
|
|||||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||||
// would be the default and the two lines below can be deleted.
|
// would be the default and the two lines below can be deleted.
|
||||||
connectionTTL = -1
|
connectionTTL = 60000
|
||||||
clientFailureCheckPeriod = -1
|
clientFailureCheckPeriod = -1
|
||||||
minLargeMessageSize = maxMessageSize
|
minLargeMessageSize = maxMessageSize
|
||||||
isUseGlobalPools = nodeSerializationEnv != null
|
isUseGlobalPools = nodeSerializationEnv != null
|
||||||
@ -49,7 +49,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration,
|
|||||||
// using our TLS certificate.
|
// using our TLS certificate.
|
||||||
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
|
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
|
||||||
// size of 1MB is acknowledged.
|
// size of 1MB is acknowledged.
|
||||||
val session = sessionFactory!!.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE)
|
val session = sessionFactory!!.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||||
session.start()
|
session.start()
|
||||||
// Create a general purpose producer.
|
// Create a general purpose producer.
|
||||||
val producer = session.createProducer()
|
val producer = session.createProducer()
|
||||||
|
@ -26,7 +26,7 @@ class InternalRPCMessagingClient(val sslConfig: SSLConfiguration, val serverAddr
|
|||||||
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||||
// would be the default and the two lines below can be deleted.
|
// would be the default and the two lines below can be deleted.
|
||||||
connectionTTL = -1
|
connectionTTL = 60000
|
||||||
clientFailureCheckPeriod = -1
|
clientFailureCheckPeriod = -1
|
||||||
minLargeMessageSize = maxMessageSize
|
minLargeMessageSize = maxMessageSize
|
||||||
isUseGlobalPools = nodeSerializationEnv != null
|
isUseGlobalPools = nodeSerializationEnv != null
|
||||||
|
@ -15,11 +15,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
|||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.ByteSequence
|
import net.corda.core.utilities.*
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
|
||||||
import net.corda.core.utilities.OpaqueBytes
|
|
||||||
import net.corda.core.utilities.contextLogger
|
|
||||||
import net.corda.core.utilities.trace
|
|
||||||
import net.corda.node.VersionInfo
|
import net.corda.node.VersionInfo
|
||||||
import net.corda.node.internal.LifecycleSupport
|
import net.corda.node.internal.LifecycleSupport
|
||||||
import net.corda.node.internal.artemis.ReactiveArtemisConsumer
|
import net.corda.node.internal.artemis.ReactiveArtemisConsumer
|
||||||
@ -32,15 +28,12 @@ import net.corda.node.services.statemachine.SenderDeduplicationId
|
|||||||
import net.corda.node.utilities.AffinityExecutor
|
import net.corda.node.utilities.AffinityExecutor
|
||||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
|
import net.corda.nodeapi.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
|
|
||||||
import net.corda.nodeapi.internal.bridging.BridgeControl
|
import net.corda.nodeapi.internal.bridging.BridgeControl
|
||||||
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
@ -50,12 +43,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
|||||||
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
|
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
import org.apache.activemq.artemis.api.core.client.*
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
@ -153,7 +141,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||||
// would be the default and the two lines below can be deleted.
|
// would be the default and the two lines below can be deleted.
|
||||||
connectionTTL = -1
|
connectionTTL = 60000
|
||||||
clientFailureCheckPeriod = -1
|
clientFailureCheckPeriod = -1
|
||||||
minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE
|
minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE
|
||||||
isUseGlobalPools = nodeSerializationEnv != null
|
isUseGlobalPools = nodeSerializationEnv != null
|
||||||
@ -163,7 +151,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
// using our TLS certificate.
|
// using our TLS certificate.
|
||||||
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
|
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
|
||||||
// size of 1MB is acknowledged.
|
// size of 1MB is acknowledged.
|
||||||
val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, locator!!.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) }
|
val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) }
|
||||||
|
|
||||||
producerSession = createNewSession()
|
producerSession = createNewSession()
|
||||||
bridgeSession = createNewSession()
|
bridgeSession = createNewSession()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user