mirror of
https://github.com/corda/corda.git
synced 2025-01-31 00:24:59 +00:00
ENT-2053 Artemis cleanup fix (#987)
* Bridge kill test Fix Artemis settings ENT-2053: add quasar for gradle file Add several flow + bridge kill test Debugging Potential fix * ENT-2053: create p2p queues in exclusive mode to avoid reordering when bridge is killed * ENT-2053: add exclusive flag to rest of p2p queues * ENT-2053: check addresses' types when creating queues for exclusive mode * Revert "Debugging" This reverts commit d48a49c91f3fba0609b9b744c78fc671f4a92076. * ENT-2053:address PR comments
This commit is contained in:
parent
80d7c28606
commit
ddcdb370b3
@ -11,6 +11,7 @@
|
||||
apply plugin: 'kotlin'
|
||||
apply plugin: 'java'
|
||||
apply plugin: 'net.corda.plugins.publish-utils'
|
||||
apply plugin: 'net.corda.plugins.quasar-utils'
|
||||
|
||||
description 'Corda peer bridging components'
|
||||
|
||||
|
@ -0,0 +1,99 @@
|
||||
package net.corda.bridge
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import net.corda.bridge.services.api.BridgeConfiguration
|
||||
import net.corda.bridge.services.config.BridgeConfigHelper
|
||||
import net.corda.bridge.services.config.parseAsBridgeConfiguration
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.concurrent.flatMap
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.nodeapi.internal.DEV_CA_KEY_STORE_PASS
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.crypto.loadKeyStore
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.node.internal.*
|
||||
import java.io.File
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
|
||||
|
||||
data class BridgeHandle(
|
||||
val baseDirectory: Path,
|
||||
val process: Process,
|
||||
val configuration: BridgeConfiguration,
|
||||
val bridgePort: Int,
|
||||
val brokerPort: Int,
|
||||
val debugPort: Int?
|
||||
)
|
||||
|
||||
fun startBridgeProcess(bridgePath: Path, debugPort: Int?): Process {
|
||||
return ProcessUtilities.startCordaProcess(
|
||||
className = "net.corda.bridge.Bridge",
|
||||
arguments = listOf("--base-directory", bridgePath.toString()),
|
||||
jdwpPort = debugPort,
|
||||
extraJvmArguments = listOf(),
|
||||
workingDirectory = bridgePath,
|
||||
maximumHeapSize = "200m"
|
||||
)
|
||||
}
|
||||
|
||||
fun DriverDSLImpl.startBridge(nodeName: CordaX500Name, bridgePort: Int, brokerPort: Int, configOverrides: Map<String, Any>): CordaFuture<BridgeHandle> {
|
||||
val nodeDirectory = baseDirectory(nodeName)
|
||||
val bridgeFolder = File("$nodeDirectory-bridge")
|
||||
bridgeFolder.mkdirs()
|
||||
createNetworkParams(bridgeFolder.toPath())
|
||||
val initialConfig = ConfigFactory.parseResources(ConfigTest::class.java, "/net/corda/bridge/singleprocess/bridge.conf")
|
||||
val portConfig = ConfigFactory.parseMap(
|
||||
mapOf(
|
||||
"outboundConfig" to mapOf(
|
||||
"artemisBrokerAddress" to "localhost:$brokerPort"
|
||||
),
|
||||
"inboundConfig" to mapOf(
|
||||
"listeningAddress" to "0.0.0.0:$bridgePort"
|
||||
)
|
||||
)
|
||||
)
|
||||
val config = ConfigFactory.parseMap(configOverrides).withFallback(portConfig).withFallback(initialConfig)
|
||||
writeConfig(bridgeFolder.toPath(), "bridge.conf", config)
|
||||
val bridgeConfig = BridgeConfigHelper.loadConfig(bridgeFolder.toPath()).parseAsBridgeConfiguration()
|
||||
val nodeCertificateDirectory = nodeDirectory / "certificates"
|
||||
val bridgeDebugPort = if (isDebug) debugPortAllocation.nextPort() else null
|
||||
return pollUntilTrue("$nodeName keystore creation") {
|
||||
try {
|
||||
val keyStore = loadKeyStore(nodeCertificateDirectory / "sslkeystore.jks", DEV_CA_KEY_STORE_PASS)
|
||||
keyStore.getCertificate(X509Utilities.CORDA_CLIENT_TLS)
|
||||
true
|
||||
} catch (throwable: Throwable) {
|
||||
false
|
||||
}
|
||||
}.flatMap {
|
||||
nodeCertificateDirectory.toFile().copyRecursively(File("${bridgeFolder.absolutePath}/certificates"))
|
||||
|
||||
val bridgeProcess = startBridgeProcess(bridgeFolder.toPath(), bridgeDebugPort)
|
||||
shutdownManager.registerProcessShutdown(bridgeProcess)
|
||||
addressMustBeBoundFuture(executorService, NetworkHostAndPort("localhost", bridgePort)).map {
|
||||
BridgeHandle(
|
||||
baseDirectory = bridgeFolder.toPath(),
|
||||
process = bridgeProcess,
|
||||
configuration = bridgeConfig,
|
||||
bridgePort = bridgePort,
|
||||
brokerPort = brokerPort,
|
||||
debugPort = bridgeDebugPort
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun DriverDSLImpl.bounceBridge(bridge: BridgeHandle) {
|
||||
bridge.process.destroyForcibly()
|
||||
val bridgeAddress = NetworkHostAndPort("localhost", bridge.bridgePort)
|
||||
addressMustNotBeBound(executorService, bridgeAddress)
|
||||
val newProcess = startBridgeProcess(bridge.baseDirectory, bridge.debugPort)
|
||||
shutdownManager.registerProcessShutdown(newProcess)
|
||||
addressMustBeBound(executorService, bridgeAddress)
|
||||
}
|
@ -0,0 +1,169 @@
|
||||
package net.corda.bridge
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_B_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.internalDriver
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class BridgeRestartTest {
|
||||
companion object {
|
||||
val pingStarted = ConcurrentHashMap<StateMachineRunId, OpenFuture<Unit>>()
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class Ping(val pongParty: Party, val times: Int) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val pongSession = initiateFlow(pongParty)
|
||||
pongSession.sendAndReceive<Unit>(times)
|
||||
pingStarted.getOrPut(runId) { openFuture() }.set(Unit)
|
||||
for (i in 1 .. times) {
|
||||
logger.info("PING $i")
|
||||
val j = pongSession.sendAndReceive<Int>(i).unwrap { it }
|
||||
assertEquals(i, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(Ping::class)
|
||||
class Pong(val pingSession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val times = pingSession.sendAndReceive<Int>(Unit).unwrap { it }
|
||||
for (i in 1 .. times) {
|
||||
logger.info("PONG $i")
|
||||
val j = pingSession.sendAndReceive<Int>(i).unwrap { it }
|
||||
assertEquals(i, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun restartLongPingPongFlowRandomly() {
|
||||
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
|
||||
internalDriver(isDebug = true, startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.bridge")) {
|
||||
val bFuture = startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
val bridgePort = 20005
|
||||
val brokerPort = 21005
|
||||
val aBridgeFuture = startBridge(DUMMY_BANK_A_NAME, bridgePort, brokerPort, mapOf(
|
||||
"outboundConfig" to mapOf(
|
||||
"artemisBrokerAddress" to "localhost:$brokerPort"
|
||||
),
|
||||
"inboundConfig" to mapOf(
|
||||
"listeningAddress" to "0.0.0.0:$bridgePort"
|
||||
)
|
||||
))
|
||||
|
||||
|
||||
val aFuture = startNode(
|
||||
providedName = DUMMY_BANK_A_NAME,
|
||||
rpcUsers = listOf(demoUser),
|
||||
customOverrides = mapOf(
|
||||
"p2pAddress" to "localhost:$bridgePort",
|
||||
"messagingServerAddress" to "0.0.0.0:$brokerPort",
|
||||
"messagingServerExternal" to false,
|
||||
"enterpriseConfiguration" to mapOf(
|
||||
"externalBridge" to true
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
val a = aFuture.getOrThrow()
|
||||
val b = bFuture.getOrThrow()
|
||||
val aBridge = aBridgeFuture.getOrThrow()
|
||||
|
||||
// We kill -9 and restart the bridge after a random sleep
|
||||
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
|
||||
val handle = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100)
|
||||
|
||||
val bridgeRestartThread = thread {
|
||||
pingStarted.getOrPut(handle.id) { openFuture() }.getOrThrow()
|
||||
val ms = Random().nextInt(5000)
|
||||
println("Sleeping $ms ms before kill")
|
||||
Thread.sleep(ms.toLong())
|
||||
bounceBridge(aBridge)
|
||||
}
|
||||
|
||||
handle.returnValue.getOrThrow()
|
||||
bridgeRestartThread.join()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun restartSeveralPingPongFlowsRandomly() {
|
||||
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
|
||||
internalDriver(isDebug = true, startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.bridge")) {
|
||||
val bFuture = startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
val bridgePort = 20005
|
||||
val brokerPort = 21005
|
||||
val aBridgeFuture = startBridge(DUMMY_BANK_A_NAME, bridgePort, brokerPort, mapOf(
|
||||
"outboundConfig" to mapOf(
|
||||
"artemisBrokerAddress" to "localhost:$brokerPort"
|
||||
),
|
||||
"inboundConfig" to mapOf(
|
||||
"listeningAddress" to "0.0.0.0:$bridgePort"
|
||||
)
|
||||
))
|
||||
|
||||
|
||||
val aFuture = startNode(
|
||||
providedName = DUMMY_BANK_A_NAME,
|
||||
rpcUsers = listOf(demoUser),
|
||||
customOverrides = mapOf(
|
||||
"p2pAddress" to "localhost:$bridgePort",
|
||||
"messagingServerAddress" to "0.0.0.0:$brokerPort",
|
||||
"messagingServerExternal" to false,
|
||||
"enterpriseConfiguration" to mapOf(
|
||||
"externalBridge" to true
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
val a = aFuture.getOrThrow()
|
||||
val b = bFuture.getOrThrow()
|
||||
val aBridge = aBridgeFuture.getOrThrow()
|
||||
|
||||
// We kill -9 and restart the bridge after a random sleep
|
||||
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { connection ->
|
||||
val handles = (1 .. 10).map {
|
||||
connection.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100)
|
||||
}
|
||||
|
||||
val bridgeRestartThread = thread(isDaemon = true) {
|
||||
//pingStarted.getOrPut(handle.id) { openFuture() }.getOrThrow()
|
||||
val ms = Random().nextInt(5000)
|
||||
println("Sleeping $ms ms before kill")
|
||||
Thread.sleep(ms.toLong())
|
||||
bounceBridge(aBridge)
|
||||
}
|
||||
|
||||
for (handle in handles) {
|
||||
handle.returnValue.getOrThrow()
|
||||
}
|
||||
bridgeRestartThread.join()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -49,6 +49,7 @@ import net.corda.nodeapi.internal.bridging.BridgeControl
|
||||
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.requireMessageSize
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
||||
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
|
||||
@ -183,7 +184,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
inboxes += RemoteInboxAddress(it).queueName
|
||||
}
|
||||
|
||||
inboxes.forEach { createQueueIfAbsent(it, producerSession!!) }
|
||||
inboxes.forEach { createQueueIfAbsent(it, producerSession!!, exclusive = true) }
|
||||
|
||||
p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents)
|
||||
|
||||
val messagingExecutor = MessagingExecutor(
|
||||
@ -495,14 +497,14 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
val internalTargetQueue = (address as? ArtemisAddress)?.queueName
|
||||
?: throw IllegalArgumentException("Not an Artemis address")
|
||||
state.locked {
|
||||
createQueueIfAbsent(internalTargetQueue, producerSession!!)
|
||||
createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = address !is ServiceAddress)
|
||||
}
|
||||
internalTargetQueue
|
||||
}
|
||||
}
|
||||
|
||||
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
|
||||
private fun createQueueIfAbsent(queueName: String, session: ClientSession) {
|
||||
private fun createQueueIfAbsent(queueName: String, session: ClientSession, exclusive: Boolean) {
|
||||
fun sendBridgeCreateMessage() {
|
||||
val keyHash = queueName.substring(PEERS_PREFIX.length)
|
||||
val peers = networkMap.getNodesByOwningKeyIndex(keyHash)
|
||||
@ -521,7 +523,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
val queueQuery = session.queueQuery(SimpleString(queueName))
|
||||
if (!queueQuery.isExists) {
|
||||
log.info("Create fresh queue $queueName bound on same address")
|
||||
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
|
||||
session.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false,
|
||||
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
|
||||
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), exclusive, null)
|
||||
sendBridgeCreateMessage()
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user