Merge pull request #890 from corda/mnesbit-bridge-artemis-reconnect-bug

ENT-1991: The Bridge Artemis reconnection logic failed in test.
This commit is contained in:
Matthew Nesbit 2018-05-30 11:05:20 +01:00 committed by GitHub
commit 74a3c70ee5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 106 additions and 49 deletions

View File

@ -21,7 +21,6 @@ import net.corda.core.internal.div
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.EnterpriseConfiguration
import net.corda.node.services.config.MutualExclusionConfiguration
import net.corda.node.services.config.NodeConfiguration
@ -30,7 +29,9 @@ import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.bridging.BridgeEntry
import net.corda.nodeapi.internal.zookeeper.ZkClient
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
@ -40,8 +41,7 @@ import net.corda.testing.internal.rigorousMock
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.curator.test.TestingServer
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
import org.junit.Assert.*
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
@ -273,6 +273,7 @@ class BridgeIntegrationTest {
config.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val (artemisServer, artemisClient) = createArtemis()
val (artemisServer2, artemisClient2) = createArtemis2()
val (artemisServer3, artemisClient3) = createDummyPeerArtemis()
try {
artemisServer2.start()
artemisClient2.start()
@ -286,6 +287,12 @@ class BridgeIntegrationTest {
assertEquals(true, stateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, serverListening("localhost", 10005))
var bridgeTimeout = 0
while (artemisServer3.serverControl.connectionCount < 2 && bridgeTimeout < 10) { // wait for outgoing bridge to start
++bridgeTimeout
Thread.sleep(1000L)
}
assertTrue(bridgeTimeout < 10)
artemisClient.stop() // Stop artemis to force failover to second choice
artemisServer.stop()
assertEquals(false, stateFollower.next())
@ -301,6 +308,8 @@ class BridgeIntegrationTest {
artemisServer.stop()
artemisClient2.stop()
artemisServer2.stop()
artemisClient3.stop()
artemisServer3.stop()
}
}
@ -323,6 +332,7 @@ class BridgeIntegrationTest {
assertEquals(NetworkHostAndPort("0.0.0.0", 10005), floatConfig.inboundConfig!!.listeningAddress)
val (artemisServer, artemisClient) = createArtemis()
val (artemisServer2, artemisClient2) = createArtemis2()
val (artemisServer3, artemisClient3) = createDummyPeerArtemis()
try {
installBridgeControlResponder(artemisClient)
val bridge = BridgeInstance(bridgeConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
@ -340,6 +350,12 @@ class BridgeIntegrationTest {
assertEquals(true, bridge.active)
assertEquals(true, float.active)
assertEquals(true, serverListening("localhost", 10005)) // now activated
var bridgeTimeout = 0
while (artemisServer3.serverControl.connectionCount < 2 && bridgeTimeout < 10) { // wait for outgoing bridge to start
++bridgeTimeout
Thread.sleep(1000L)
}
assertTrue(bridgeTimeout < 10)
artemisClient.stop() // Stop artemis to force failover to second choice
artemisServer.stop()
assertEquals(false, bridgeStateFollower.next())
@ -372,7 +388,10 @@ class BridgeIntegrationTest {
} finally {
artemisClient.stop()
artemisServer.stop()
artemisClient2.stop()
artemisServer2.stop()
artemisClient3.stop()
artemisServer3.stop()
}
}
@ -385,7 +404,6 @@ class BridgeIntegrationTest {
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn(NetworkHostAndPort("localhost", 11005)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
}
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 11005), MAX_MESSAGE_SIZE)
@ -409,7 +427,6 @@ class BridgeIntegrationTest {
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn(NetworkHostAndPort("localhost", 12005)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
}
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 12005), MAX_MESSAGE_SIZE)
@ -417,14 +434,41 @@ class BridgeIntegrationTest {
return Pair(artemisServer, artemisClient)
}
private fun createDummyPeerArtemis(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
val originalCertsFolderPath = tempFolder.root.toPath() / "certificates"
val folderPath = tempFolder.root.toPath() / "artemis3"
val newCertsFolderPath = folderPath / "certificates"
newCertsFolderPath.createDirectories()
(originalCertsFolderPath / "truststore.jks").copyToDirectory(newCertsFolderPath)
(originalCertsFolderPath / "sslkeystore.jks").copyToDirectory(newCertsFolderPath)
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(folderPath).whenever(it).baseDirectory
doReturn(DUMMY_BANK_A_NAME).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn(NetworkHostAndPort("localhost", 7890)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
}
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 7890), MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", 7890), MAX_MESSAGE_SIZE)
artemisServer.start()
artemisClient.start()
artemisClient.started!!.session.createQueue(SimpleString("${P2P_PREFIX}12345"), RoutingType.ANYCAST, SimpleString("${P2P_PREFIX}12345"), true)
return Pair(artemisServer, artemisClient)
}
private fun installBridgeControlResponder(artemisClient: ArtemisMessagingClient) {
val artemis = artemisClient.started!!
val inboxAddress = SimpleString("${P2P_PREFIX}Test")
val dummyOutQueue = SimpleString("${PEERS_PREFIX}12345")
artemis.session.createQueue(inboxAddress, RoutingType.ANYCAST, inboxAddress, true)
artemis.session.createQueue(dummyOutQueue, RoutingType.ANYCAST, dummyOutQueue, true)
artemis.session.createQueue(BRIDGE_NOTIFY, RoutingType.ANYCAST, BRIDGE_NOTIFY, false)
val controlConsumer = artemis.session.createConsumer(BRIDGE_NOTIFY)
controlConsumer.setMessageHandler { msg ->
val bridgeControl = BridgeControl.NodeToBridgeSnapshot("Test", listOf(inboxAddress.toString()), emptyList())
val outEntry = listOf(BridgeEntry(dummyOutQueue.toString(), listOf(NetworkHostAndPort("localhost", 7890)), listOf(DUMMY_BANK_A_NAME)))
val bridgeControl = BridgeControl.NodeToBridgeSnapshot("Test", listOf(inboxAddress.toString()), outEntry)
val controlPacket = bridgeControl.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val artemisMessage = artemis.session.createMessage(false)
artemisMessage.writeBodyBufferBytes(controlPacket)

View File

@ -151,9 +151,9 @@ class BridgeInstance(val conf: BridgeConfiguration,
}
}
statusFollower = ServiceStateCombiner(listOf(bridgeAuditService, floatSupervisorService, bridgeSupervisorService).filterNotNull())
statusSubscriber = statusFollower!!.activeChange.subscribe {
statusSubscriber = statusFollower!!.activeChange.subscribe({
stateHelper.active = it
}
}, { log.error("Error in state change", it) })
}
private fun startServices() {

View File

@ -54,13 +54,13 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
startArtemisConnection()
} else {
stopArtemisConnection()
}
}
}, { log.error("Error in state change", it) })
}
private fun startArtemisConnection() {

View File

@ -44,7 +44,7 @@ class SimpleMessageFilterService(val conf: BridgeConfiguration,
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
inboundSession = artemisConnectionService.started!!.sessionFactory.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
inboundProducer = inboundSession!!.createProducer()
@ -55,7 +55,7 @@ class SimpleMessageFilterService(val conf: BridgeConfiguration,
inboundSession = null
}
stateHelper.active = it
}
}, { log.error("Error in state change", it) })
}
override fun stop() {

View File

@ -72,7 +72,7 @@ class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
maxMessageSize,
conf.enableAMQPPacketTrace)
onConnectSubscription = server.onConnection.subscribe(_onConnection)
onConnectAuditSubscription = server.onConnection.subscribe {
onConnectAuditSubscription = server.onConnection.subscribe({
if (it.connected) {
auditService.successfulConnectionEvent(true, it.remoteAddress, it.remoteCert?.subjectDN?.name
?: "", "Successful AMQP inbound connection")
@ -80,7 +80,7 @@ class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
auditService.failedConnectionEvent(true, it.remoteAddress, it.remoteCert?.subjectDN?.name
?: "", "Failed AMQP inbound connection")
}
}
}, { log.error("Connection event error", it) })
onReceiveSubscription = server.onReceive.subscribe(_onReceive)
amqpServer = server
server.start()
@ -122,9 +122,9 @@ class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
stateHelper.active = it
}
}, { log.error("Error in state change", it) })
}
override fun stop() {

View File

@ -68,17 +68,17 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
startControlListener()
} else {
stopControlListener()
}
stateHelper.active = it
}
incomingMessageSubscriber = amqpListener.onReceive.subscribe {
}, { log.error("Error in state change", it) })
incomingMessageSubscriber = amqpListener.onReceive.subscribe({
forwardReceivedMessage(it)
}
}, { log.error("Error in state change", it) })
}
private fun startControlListener() {
@ -93,8 +93,8 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
conf.crlCheckSoftFail,
maxMessageSize,
conf.enableAMQPPacketTrace)
connectSubscriber = controlServer.onConnection.subscribe { onConnectToControl(it) }
receiveSubscriber = controlServer.onReceive.subscribe { onControlMessage(it) }
connectSubscriber = controlServer.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) })
receiveSubscriber = controlServer.onReceive.subscribe({ onControlMessage(it) }, { log.error("Receive event error", it) })
amqpControlServer = controlServer
controlServer.start()
}

View File

@ -40,7 +40,7 @@ class InProcessBridgeReceiverService(val conf: BridgeConfiguration,
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
val keyStoreBytes = sslConfiguration.sslKeystore.readAll()
val trustStoreBytes = sslConfiguration.trustStoreFile.readAll()
@ -55,10 +55,10 @@ class InProcessBridgeReceiverService(val conf: BridgeConfiguration,
}
}
stateHelper.active = it
}
receiveSubscriber = amqpListenerService.onReceive.subscribe {
}, { log.error("Error in state change", it) })
receiveSubscriber = amqpListenerService.onReceive.subscribe({
processMessage(it)
}
}, { log.error("Error in state change", it) })
}
private fun processMessage(receivedMessage: ReceivedMessage) {

View File

@ -70,7 +70,7 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
val floatAddresses = conf.bridgeInnerConfig!!.floatAddresses
val controlClient = AMQPClient(floatAddresses,
@ -83,15 +83,15 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
conf.crlCheckSoftFail,
maxMessageSize,
conf.enableAMQPPacketTrace)
connectSubscriber = controlClient.onConnection.subscribe { onConnectToControl(it) }
receiveSubscriber = controlClient.onReceive.subscribe { onFloatMessage(it) }
connectSubscriber = controlClient.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) })
receiveSubscriber = controlClient.onReceive.subscribe({ onFloatMessage(it) }, { log.error("Receive event error", it) })
amqpControlClient = controlClient
controlClient.start()
} else {
stateHelper.active = false
closeAMQPClient()
}
}
}, { log.error("Error in state change", it) })
}
private fun closeAMQPClient() {

View File

@ -32,7 +32,6 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
private var connectionSubscriber: Subscription? = null
private var listenerActiveSubscriber: Subscription? = null
private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) })
@ -56,11 +55,11 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe { ready ->
statusSubscriber = statusFollower.activeChange.subscribe({ ready ->
if (ready) {
listenerActiveSubscriber = bridgeControlListener.activeChange.subscribe {
listenerActiveSubscriber = bridgeControlListener.activeChange.subscribe({
stateHelper.active = it
}
}, { log.error("Bridge event error", it) })
bridgeControlListener.start()
auditService.statusChangeEvent("Waiting for activation by at least one bridge control inbox registration")
} else {
@ -69,7 +68,7 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
listenerActiveSubscriber = null
bridgeControlListener.stop()
}
}
}, { log.error("Error in state change", it) })
}
override fun stop() {
@ -77,8 +76,6 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
listenerActiveSubscriber?.unsubscribe()
listenerActiveSubscriber = null
bridgeControlListener.stop()
connectionSubscriber?.unsubscribe()
connectionSubscriber = null
statusSubscriber?.unsubscribe()
statusSubscriber = null
}

View File

@ -58,15 +58,15 @@ class BridgeSupervisorServiceImpl(val conf: BridgeConfiguration,
TunnelingBridgeReceiverService(conf, maxMessageSize, auditService, haService, filterService)
}
statusFollower = ServiceStateCombiner(listOf(haService, senderService, receiverService, filterService))
activeChange.subscribe {
activeChange.subscribe({
consoleLogger.info("BridgeSupervisorService: active = $it")
}
}, { log.error("Error in state change", it) })
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
stateHelper.active = it
}
}, { log.error("Error in state change", it) })
artemisService.start()
senderService.start()
receiverService.start()

View File

@ -42,15 +42,15 @@ class FloatSupervisorServiceImpl(val conf: BridgeConfiguration,
null
}
statusFollower = ServiceStateCombiner(listOf(amqpListenerService, floatControlService).filterNotNull())
activeChange.subscribe {
activeChange.subscribe({
consoleLogger.info("FloatSupervisorService: active = $it")
}
}, { log.error("Error in state change", it) })
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
stateHelper.active = it
}
}, { log.error("Error in state change", it) })
amqpListenerService.start()
floatControlService?.start()
}

View File

@ -110,9 +110,17 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
log.info("Stopping AMQP bridge")
lock.withLock {
synchronized(artemis) {
consumer?.close()
consumer?.apply {
if (!isClosed) {
close()
}
}
consumer = null
session?.stop()
session?.apply {
if (!isClosed) {
stop()
}
}
session = null
}
}
@ -135,9 +143,17 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
session.start()
} else {
log.info("Bridge Disconnected")
consumer?.close()
consumer?.apply {
if (!isClosed) {
close()
}
}
consumer = null
session?.stop()
session?.apply {
if (!isClosed) {
stop()
}
}
session = null
}
}