The Artemis reconnection logic failed in test, because there were active artemis bridge connections. Correct the integration test, fix the bridging code and log all onError cases in the bridge.

This commit is contained in:
Matthew Nesbit 2018-05-29 18:14:35 +01:00
parent 687b6080af
commit f98bf4db14
12 changed files with 106 additions and 49 deletions

View File

@ -21,7 +21,6 @@ import net.corda.core.internal.div
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.EnterpriseConfiguration
import net.corda.node.services.config.MutualExclusionConfiguration
import net.corda.node.services.config.NodeConfiguration
@ -30,7 +29,9 @@ import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.bridging.BridgeEntry
import net.corda.nodeapi.internal.zookeeper.ZkClient
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
@ -40,8 +41,7 @@ import net.corda.testing.internal.rigorousMock
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.curator.test.TestingServer
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
import org.junit.Assert.*
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
@ -273,6 +273,7 @@ class BridgeIntegrationTest {
config.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val (artemisServer, artemisClient) = createArtemis()
val (artemisServer2, artemisClient2) = createArtemis2()
val (artemisServer3, artemisClient3) = createDummyPeerArtemis()
try {
artemisServer2.start()
artemisClient2.start()
@ -286,6 +287,12 @@ class BridgeIntegrationTest {
assertEquals(true, stateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, serverListening("localhost", 10005))
var bridgeTimeout = 0
while (artemisServer3.serverControl.connectionCount < 2 && bridgeTimeout < 10) { // wait for outgoing bridge to start
++bridgeTimeout
Thread.sleep(1000L)
}
assertTrue(bridgeTimeout < 10)
artemisClient.stop() // Stop artemis to force failover to second choice
artemisServer.stop()
assertEquals(false, stateFollower.next())
@ -301,6 +308,8 @@ class BridgeIntegrationTest {
artemisServer.stop()
artemisClient2.stop()
artemisServer2.stop()
artemisClient3.stop()
artemisServer3.stop()
}
}
@ -323,6 +332,7 @@ class BridgeIntegrationTest {
assertEquals(NetworkHostAndPort("0.0.0.0", 10005), floatConfig.inboundConfig!!.listeningAddress)
val (artemisServer, artemisClient) = createArtemis()
val (artemisServer2, artemisClient2) = createArtemis2()
val (artemisServer3, artemisClient3) = createDummyPeerArtemis()
try {
installBridgeControlResponder(artemisClient)
val bridge = BridgeInstance(bridgeConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
@ -340,6 +350,12 @@ class BridgeIntegrationTest {
assertEquals(true, bridge.active)
assertEquals(true, float.active)
assertEquals(true, serverListening("localhost", 10005)) // now activated
var bridgeTimeout = 0
while (artemisServer3.serverControl.connectionCount < 2 && bridgeTimeout < 10) { // wait for outgoing bridge to start
++bridgeTimeout
Thread.sleep(1000L)
}
assertTrue(bridgeTimeout < 10)
artemisClient.stop() // Stop artemis to force failover to second choice
artemisServer.stop()
assertEquals(false, bridgeStateFollower.next())
@ -372,7 +388,10 @@ class BridgeIntegrationTest {
} finally {
artemisClient.stop()
artemisServer.stop()
artemisClient2.stop()
artemisServer2.stop()
artemisClient3.stop()
artemisServer3.stop()
}
}
@ -385,7 +404,6 @@ class BridgeIntegrationTest {
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn(NetworkHostAndPort("localhost", 11005)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
}
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 11005), MAX_MESSAGE_SIZE)
@ -409,7 +427,6 @@ class BridgeIntegrationTest {
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn(NetworkHostAndPort("localhost", 12005)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
}
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 12005), MAX_MESSAGE_SIZE)
@ -417,14 +434,41 @@ class BridgeIntegrationTest {
return Pair(artemisServer, artemisClient)
}
private fun createDummyPeerArtemis(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
val originalCertsFolderPath = tempFolder.root.toPath() / "certificates"
val folderPath = tempFolder.root.toPath() / "artemis3"
val newCertsFolderPath = folderPath / "certificates"
newCertsFolderPath.createDirectories()
(originalCertsFolderPath / "truststore.jks").copyToDirectory(newCertsFolderPath)
(originalCertsFolderPath / "sslkeystore.jks").copyToDirectory(newCertsFolderPath)
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(folderPath).whenever(it).baseDirectory
doReturn(DUMMY_BANK_A_NAME).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn(NetworkHostAndPort("localhost", 7890)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
}
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 7890), MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", 7890), MAX_MESSAGE_SIZE)
artemisServer.start()
artemisClient.start()
artemisClient.started!!.session.createQueue(SimpleString("${P2P_PREFIX}12345"), RoutingType.ANYCAST, SimpleString("${P2P_PREFIX}12345"), true)
return Pair(artemisServer, artemisClient)
}
private fun installBridgeControlResponder(artemisClient: ArtemisMessagingClient) {
val artemis = artemisClient.started!!
val inboxAddress = SimpleString("${P2P_PREFIX}Test")
val dummyOutQueue = SimpleString("${PEERS_PREFIX}12345")
artemis.session.createQueue(inboxAddress, RoutingType.ANYCAST, inboxAddress, true)
artemis.session.createQueue(dummyOutQueue, RoutingType.ANYCAST, dummyOutQueue, true)
artemis.session.createQueue(BRIDGE_NOTIFY, RoutingType.ANYCAST, BRIDGE_NOTIFY, false)
val controlConsumer = artemis.session.createConsumer(BRIDGE_NOTIFY)
controlConsumer.setMessageHandler { msg ->
val bridgeControl = BridgeControl.NodeToBridgeSnapshot("Test", listOf(inboxAddress.toString()), emptyList())
val outEntry = listOf(BridgeEntry(dummyOutQueue.toString(), listOf(NetworkHostAndPort("localhost", 7890)), listOf(DUMMY_BANK_A_NAME)))
val bridgeControl = BridgeControl.NodeToBridgeSnapshot("Test", listOf(inboxAddress.toString()), outEntry)
val controlPacket = bridgeControl.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val artemisMessage = artemis.session.createMessage(false)
artemisMessage.writeBodyBufferBytes(controlPacket)

View File

@ -151,9 +151,9 @@ class BridgeInstance(val conf: BridgeConfiguration,
}
}
statusFollower = ServiceStateCombiner(listOf(bridgeAuditService, floatSupervisorService, bridgeSupervisorService).filterNotNull())
statusSubscriber = statusFollower!!.activeChange.subscribe {
statusSubscriber = statusFollower!!.activeChange.subscribe({
stateHelper.active = it
}
}, { log.error("Error in state change", it) })
}
private fun startServices() {

View File

@ -54,13 +54,13 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
startArtemisConnection()
} else {
stopArtemisConnection()
}
}
}, { log.error("Error in state change", it) })
}
private fun startArtemisConnection() {

View File

@ -44,7 +44,7 @@ class SimpleMessageFilterService(val conf: BridgeConfiguration,
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
inboundSession = artemisConnectionService.started!!.sessionFactory.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
inboundProducer = inboundSession!!.createProducer()
@ -55,7 +55,7 @@ class SimpleMessageFilterService(val conf: BridgeConfiguration,
inboundSession = null
}
stateHelper.active = it
}
}, { log.error("Error in state change", it) })
}
override fun stop() {

View File

@ -72,7 +72,7 @@ class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
maxMessageSize,
conf.enableAMQPPacketTrace)
onConnectSubscription = server.onConnection.subscribe(_onConnection)
onConnectAuditSubscription = server.onConnection.subscribe {
onConnectAuditSubscription = server.onConnection.subscribe({
if (it.connected) {
auditService.successfulConnectionEvent(true, it.remoteAddress, it.remoteCert?.subjectDN?.name
?: "", "Successful AMQP inbound connection")
@ -80,7 +80,7 @@ class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
auditService.failedConnectionEvent(true, it.remoteAddress, it.remoteCert?.subjectDN?.name
?: "", "Failed AMQP inbound connection")
}
}
}, { log.error("Connection event error", it) })
onReceiveSubscription = server.onReceive.subscribe(_onReceive)
amqpServer = server
server.start()
@ -122,9 +122,9 @@ class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
stateHelper.active = it
}
}, { log.error("Error in state change", it) })
}
override fun stop() {

View File

@ -68,17 +68,17 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
startControlListener()
} else {
stopControlListener()
}
stateHelper.active = it
}
incomingMessageSubscriber = amqpListener.onReceive.subscribe {
}, { log.error("Error in state change", it) })
incomingMessageSubscriber = amqpListener.onReceive.subscribe({
forwardReceivedMessage(it)
}
}, { log.error("Error in state change", it) })
}
private fun startControlListener() {
@ -93,8 +93,8 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
conf.crlCheckSoftFail,
maxMessageSize,
conf.enableAMQPPacketTrace)
connectSubscriber = controlServer.onConnection.subscribe { onConnectToControl(it) }
receiveSubscriber = controlServer.onReceive.subscribe { onControlMessage(it) }
connectSubscriber = controlServer.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) })
receiveSubscriber = controlServer.onReceive.subscribe({ onControlMessage(it) }, { log.error("Receive event error", it) })
amqpControlServer = controlServer
controlServer.start()
}

View File

@ -40,7 +40,7 @@ class InProcessBridgeReceiverService(val conf: BridgeConfiguration,
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
val keyStoreBytes = sslConfiguration.sslKeystore.readAll()
val trustStoreBytes = sslConfiguration.trustStoreFile.readAll()
@ -55,10 +55,10 @@ class InProcessBridgeReceiverService(val conf: BridgeConfiguration,
}
}
stateHelper.active = it
}
receiveSubscriber = amqpListenerService.onReceive.subscribe {
}, { log.error("Error in state change", it) })
receiveSubscriber = amqpListenerService.onReceive.subscribe({
processMessage(it)
}
}, { log.error("Error in state change", it) })
}
private fun processMessage(receivedMessage: ReceivedMessage) {

View File

@ -70,7 +70,7 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
val floatAddresses = conf.bridgeInnerConfig!!.floatAddresses
val controlClient = AMQPClient(floatAddresses,
@ -83,15 +83,15 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
conf.crlCheckSoftFail,
maxMessageSize,
conf.enableAMQPPacketTrace)
connectSubscriber = controlClient.onConnection.subscribe { onConnectToControl(it) }
receiveSubscriber = controlClient.onReceive.subscribe { onFloatMessage(it) }
connectSubscriber = controlClient.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) })
receiveSubscriber = controlClient.onReceive.subscribe({ onFloatMessage(it) }, { log.error("Receive event error", it) })
amqpControlClient = controlClient
controlClient.start()
} else {
stateHelper.active = false
closeAMQPClient()
}
}
}, { log.error("Error in state change", it) })
}
private fun closeAMQPClient() {

View File

@ -32,7 +32,6 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
private var connectionSubscriber: Subscription? = null
private var listenerActiveSubscriber: Subscription? = null
private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) })
@ -56,11 +55,11 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe { ready ->
statusSubscriber = statusFollower.activeChange.subscribe({ ready ->
if (ready) {
listenerActiveSubscriber = bridgeControlListener.activeChange.subscribe {
listenerActiveSubscriber = bridgeControlListener.activeChange.subscribe({
stateHelper.active = it
}
}, { log.error("Bridge event error", it) })
bridgeControlListener.start()
auditService.statusChangeEvent("Waiting for activation by at least one bridge control inbox registration")
} else {
@ -69,7 +68,7 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
listenerActiveSubscriber = null
bridgeControlListener.stop()
}
}
}, { log.error("Error in state change", it) })
}
override fun stop() {
@ -77,8 +76,6 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
listenerActiveSubscriber?.unsubscribe()
listenerActiveSubscriber = null
bridgeControlListener.stop()
connectionSubscriber?.unsubscribe()
connectionSubscriber = null
statusSubscriber?.unsubscribe()
statusSubscriber = null
}

View File

@ -58,15 +58,15 @@ class BridgeSupervisorServiceImpl(val conf: BridgeConfiguration,
TunnelingBridgeReceiverService(conf, maxMessageSize, auditService, haService, filterService)
}
statusFollower = ServiceStateCombiner(listOf(haService, senderService, receiverService, filterService))
activeChange.subscribe {
activeChange.subscribe({
consoleLogger.info("BridgeSupervisorService: active = $it")
}
}, { log.error("Error in state change", it) })
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
stateHelper.active = it
}
}, { log.error("Error in state change", it) })
artemisService.start()
senderService.start()
receiverService.start()

View File

@ -42,15 +42,15 @@ class FloatSupervisorServiceImpl(val conf: BridgeConfiguration,
null
}
statusFollower = ServiceStateCombiner(listOf(amqpListenerService, floatControlService).filterNotNull())
activeChange.subscribe {
activeChange.subscribe({
consoleLogger.info("FloatSupervisorService: active = $it")
}
}, { log.error("Error in state change", it) })
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
statusSubscriber = statusFollower.activeChange.subscribe({
stateHelper.active = it
}
}, { log.error("Error in state change", it) })
amqpListenerService.start()
floatControlService?.start()
}

View File

@ -110,9 +110,17 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
log.info("Stopping AMQP bridge")
lock.withLock {
synchronized(artemis) {
consumer?.close()
consumer?.apply {
if (!isClosed) {
close()
}
}
consumer = null
session?.stop()
session?.apply {
if (!isClosed) {
stop()
}
}
session = null
}
}
@ -135,9 +143,17 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
session.start()
} else {
log.info("Bridge Disconnected")
consumer?.close()
consumer?.apply {
if (!isClosed) {
close()
}
}
consumer = null
session?.stop()
session?.apply {
if (!isClosed) {
stop()
}
}
session = null
}
}