From ddcdb370b30f67e4b8a19641fd35ab786a30d46e Mon Sep 17 00:00:00 2001 From: bpaunescu Date: Wed, 20 Jun 2018 13:06:24 +0100 Subject: [PATCH] ENT-2053 Artemis cleanup fix (#987) * Bridge kill test Fix Artemis settings ENT-2053: add quasar for gradle file Add several flow + bridge kill test Debugging Potential fix * ENT-2053: create p2p queues in exclusive mode to avoid reordering when bridge is killed * ENT-2053: add exclusive flag to rest of p2p queues * ENT-2053: check addresses' types when creating queues for exclusive mode * Revert "Debugging" This reverts commit d48a49c91f3fba0609b9b744c78fc671f4a92076. * ENT-2053:address PR comments --- bridge/build.gradle | 1 + .../kotlin/net/corda/bridge/BridgeDriver.kt | 99 ++++++++++ .../net/corda/bridge/BridgeRestartTest.kt | 169 ++++++++++++++++++ .../services/messaging/P2PMessagingClient.kt | 12 +- 4 files changed, 277 insertions(+), 4 deletions(-) create mode 100644 bridge/src/integration-test/kotlin/net/corda/bridge/BridgeDriver.kt create mode 100644 bridge/src/integration-test/kotlin/net/corda/bridge/BridgeRestartTest.kt diff --git a/bridge/build.gradle b/bridge/build.gradle index 1f7f0671a5..cf6eb6b4d7 100644 --- a/bridge/build.gradle +++ b/bridge/build.gradle @@ -11,6 +11,7 @@ apply plugin: 'kotlin' apply plugin: 'java' apply plugin: 'net.corda.plugins.publish-utils' +apply plugin: 'net.corda.plugins.quasar-utils' description 'Corda peer bridging components' diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeDriver.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeDriver.kt new file mode 100644 index 0000000000..2d006f6867 --- /dev/null +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeDriver.kt @@ -0,0 +1,99 @@ +package net.corda.bridge + +import com.typesafe.config.ConfigFactory +import net.corda.bridge.services.api.BridgeConfiguration +import net.corda.bridge.services.config.BridgeConfigHelper +import net.corda.bridge.services.config.parseAsBridgeConfiguration +import net.corda.core.concurrent.CordaFuture +import net.corda.core.crypto.SecureHash +import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.concurrent.flatMap +import net.corda.core.internal.concurrent.map +import net.corda.core.internal.div +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.getOrThrow +import net.corda.nodeapi.internal.DEV_CA_KEY_STORE_PASS +import net.corda.nodeapi.internal.crypto.X509Utilities +import net.corda.nodeapi.internal.crypto.loadKeyStore +import net.corda.testing.driver.NodeHandle +import net.corda.testing.node.internal.* +import java.io.File +import java.nio.file.Files +import java.nio.file.Path + + +data class BridgeHandle( + val baseDirectory: Path, + val process: Process, + val configuration: BridgeConfiguration, + val bridgePort: Int, + val brokerPort: Int, + val debugPort: Int? +) + +fun startBridgeProcess(bridgePath: Path, debugPort: Int?): Process { + return ProcessUtilities.startCordaProcess( + className = "net.corda.bridge.Bridge", + arguments = listOf("--base-directory", bridgePath.toString()), + jdwpPort = debugPort, + extraJvmArguments = listOf(), + workingDirectory = bridgePath, + maximumHeapSize = "200m" + ) +} + +fun DriverDSLImpl.startBridge(nodeName: CordaX500Name, bridgePort: Int, brokerPort: Int, configOverrides: Map): CordaFuture { + val nodeDirectory = baseDirectory(nodeName) + val bridgeFolder = File("$nodeDirectory-bridge") + bridgeFolder.mkdirs() + createNetworkParams(bridgeFolder.toPath()) + val initialConfig = ConfigFactory.parseResources(ConfigTest::class.java, "/net/corda/bridge/singleprocess/bridge.conf") + val portConfig = ConfigFactory.parseMap( + mapOf( + "outboundConfig" to mapOf( + "artemisBrokerAddress" to "localhost:$brokerPort" + ), + "inboundConfig" to mapOf( + "listeningAddress" to "0.0.0.0:$bridgePort" + ) + ) + ) + val config = ConfigFactory.parseMap(configOverrides).withFallback(portConfig).withFallback(initialConfig) + writeConfig(bridgeFolder.toPath(), "bridge.conf", config) + val bridgeConfig = BridgeConfigHelper.loadConfig(bridgeFolder.toPath()).parseAsBridgeConfiguration() + val nodeCertificateDirectory = nodeDirectory / "certificates" + val bridgeDebugPort = if (isDebug) debugPortAllocation.nextPort() else null + return pollUntilTrue("$nodeName keystore creation") { + try { + val keyStore = loadKeyStore(nodeCertificateDirectory / "sslkeystore.jks", DEV_CA_KEY_STORE_PASS) + keyStore.getCertificate(X509Utilities.CORDA_CLIENT_TLS) + true + } catch (throwable: Throwable) { + false + } + }.flatMap { + nodeCertificateDirectory.toFile().copyRecursively(File("${bridgeFolder.absolutePath}/certificates")) + + val bridgeProcess = startBridgeProcess(bridgeFolder.toPath(), bridgeDebugPort) + shutdownManager.registerProcessShutdown(bridgeProcess) + addressMustBeBoundFuture(executorService, NetworkHostAndPort("localhost", bridgePort)).map { + BridgeHandle( + baseDirectory = bridgeFolder.toPath(), + process = bridgeProcess, + configuration = bridgeConfig, + bridgePort = bridgePort, + brokerPort = brokerPort, + debugPort = bridgeDebugPort + ) + } + } +} + +fun DriverDSLImpl.bounceBridge(bridge: BridgeHandle) { + bridge.process.destroyForcibly() + val bridgeAddress = NetworkHostAndPort("localhost", bridge.bridgePort) + addressMustNotBeBound(executorService, bridgeAddress) + val newProcess = startBridgeProcess(bridge.baseDirectory, bridge.debugPort) + shutdownManager.registerProcessShutdown(newProcess) + addressMustBeBound(executorService, bridgeAddress) +} diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeRestartTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeRestartTest.kt new file mode 100644 index 0000000000..efe7b268f3 --- /dev/null +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeRestartTest.kt @@ -0,0 +1,169 @@ +package net.corda.bridge + +import co.paralleluniverse.fibers.Suspendable +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.concurrent.CordaFuture +import net.corda.core.flows.* +import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.OpenFuture +import net.corda.core.internal.concurrent.doneFuture +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap +import net.corda.node.services.Permissions +import net.corda.testing.core.DUMMY_BANK_A_NAME +import net.corda.testing.core.DUMMY_BANK_B_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.node.User +import net.corda.testing.node.internal.internalDriver +import org.junit.Test +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CountDownLatch +import kotlin.concurrent.thread +import kotlin.test.assertEquals + +class BridgeRestartTest { + companion object { + val pingStarted = ConcurrentHashMap>() + } + + @StartableByRPC + @InitiatingFlow + class Ping(val pongParty: Party, val times: Int) : FlowLogic() { + @Suspendable + override fun call() { + val pongSession = initiateFlow(pongParty) + pongSession.sendAndReceive(times) + pingStarted.getOrPut(runId) { openFuture() }.set(Unit) + for (i in 1 .. times) { + logger.info("PING $i") + val j = pongSession.sendAndReceive(i).unwrap { it } + assertEquals(i, j) + } + } + } + + @InitiatedBy(Ping::class) + class Pong(val pingSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val times = pingSession.sendAndReceive(Unit).unwrap { it } + for (i in 1 .. times) { + logger.info("PONG $i") + val j = pingSession.sendAndReceive(i).unwrap { it } + assertEquals(i, j) + } + } + } + + @Test + fun restartLongPingPongFlowRandomly() { + val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) + internalDriver(isDebug = true, startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.bridge")) { + val bFuture = startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + val bridgePort = 20005 + val brokerPort = 21005 + val aBridgeFuture = startBridge(DUMMY_BANK_A_NAME, bridgePort, brokerPort, mapOf( + "outboundConfig" to mapOf( + "artemisBrokerAddress" to "localhost:$brokerPort" + ), + "inboundConfig" to mapOf( + "listeningAddress" to "0.0.0.0:$bridgePort" + ) + )) + + + val aFuture = startNode( + providedName = DUMMY_BANK_A_NAME, + rpcUsers = listOf(demoUser), + customOverrides = mapOf( + "p2pAddress" to "localhost:$bridgePort", + "messagingServerAddress" to "0.0.0.0:$brokerPort", + "messagingServerExternal" to false, + "enterpriseConfiguration" to mapOf( + "externalBridge" to true + ) + ) + ) + + val a = aFuture.getOrThrow() + val b = bFuture.getOrThrow() + val aBridge = aBridgeFuture.getOrThrow() + + // We kill -9 and restart the bridge after a random sleep + CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { + val handle = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100) + + val bridgeRestartThread = thread { + pingStarted.getOrPut(handle.id) { openFuture() }.getOrThrow() + val ms = Random().nextInt(5000) + println("Sleeping $ms ms before kill") + Thread.sleep(ms.toLong()) + bounceBridge(aBridge) + } + + handle.returnValue.getOrThrow() + bridgeRestartThread.join() + } + + } + } + + @Test + fun restartSeveralPingPongFlowsRandomly() { + val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) + internalDriver(isDebug = true, startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.bridge")) { + val bFuture = startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + val bridgePort = 20005 + val brokerPort = 21005 + val aBridgeFuture = startBridge(DUMMY_BANK_A_NAME, bridgePort, brokerPort, mapOf( + "outboundConfig" to mapOf( + "artemisBrokerAddress" to "localhost:$brokerPort" + ), + "inboundConfig" to mapOf( + "listeningAddress" to "0.0.0.0:$bridgePort" + ) + )) + + + val aFuture = startNode( + providedName = DUMMY_BANK_A_NAME, + rpcUsers = listOf(demoUser), + customOverrides = mapOf( + "p2pAddress" to "localhost:$bridgePort", + "messagingServerAddress" to "0.0.0.0:$brokerPort", + "messagingServerExternal" to false, + "enterpriseConfiguration" to mapOf( + "externalBridge" to true + ) + ) + ) + + val a = aFuture.getOrThrow() + val b = bFuture.getOrThrow() + val aBridge = aBridgeFuture.getOrThrow() + + // We kill -9 and restart the bridge after a random sleep + CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { connection -> + val handles = (1 .. 10).map { + connection.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100) + } + + val bridgeRestartThread = thread(isDaemon = true) { + //pingStarted.getOrPut(handle.id) { openFuture() }.getOrThrow() + val ms = Random().nextInt(5000) + println("Sleeping $ms ms before kill") + Thread.sleep(ms.toLong()) + bounceBridge(aBridge) + } + + for (handle in handles) { + handle.returnValue.getOrThrow() + } + bridgeRestartThread.join() + } + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 187bb2235f..8bb0c2a716 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -49,6 +49,7 @@ import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.requireMessageSize +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER @@ -183,7 +184,8 @@ class P2PMessagingClient(val config: NodeConfiguration, inboxes += RemoteInboxAddress(it).queueName } - inboxes.forEach { createQueueIfAbsent(it, producerSession!!) } + inboxes.forEach { createQueueIfAbsent(it, producerSession!!, exclusive = true) } + p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents) val messagingExecutor = MessagingExecutor( @@ -495,14 +497,14 @@ class P2PMessagingClient(val config: NodeConfiguration, val internalTargetQueue = (address as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address") state.locked { - createQueueIfAbsent(internalTargetQueue, producerSession!!) + createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = address !is ServiceAddress) } internalTargetQueue } } /** Attempts to create a durable queue on the broker which is bound to an address of the same name. */ - private fun createQueueIfAbsent(queueName: String, session: ClientSession) { + private fun createQueueIfAbsent(queueName: String, session: ClientSession, exclusive: Boolean) { fun sendBridgeCreateMessage() { val keyHash = queueName.substring(PEERS_PREFIX.length) val peers = networkMap.getNodesByOwningKeyIndex(keyHash) @@ -521,7 +523,9 @@ class P2PMessagingClient(val config: NodeConfiguration, val queueQuery = session.queueQuery(SimpleString(queueName)) if (!queueQuery.isExists) { log.info("Create fresh queue $queueName bound on same address") - session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) + session.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), exclusive, null) sendBridgeCreateMessage() } }