mirror of
https://github.com/corda/corda.git
synced 2025-02-07 11:30:22 +00:00
This fixes the node startup issue - https://github.com/corda/corda/issues/37 (#48)
* Create missing artemis bridge when node is added to the network map * Added integration test for this issue * addressed PR issues
This commit is contained in:
parent
fc69624f41
commit
a4201c7152
@ -0,0 +1,29 @@
|
|||||||
|
package net.corda.services.messaging
|
||||||
|
|
||||||
|
import net.corda.node.driver.driver
|
||||||
|
import org.junit.Test
|
||||||
|
import java.nio.file.Paths
|
||||||
|
import java.time.Instant
|
||||||
|
import java.time.ZoneOffset
|
||||||
|
import java.time.format.DateTimeFormatter
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
class ArtemisMessagingServerTest {
|
||||||
|
@Test
|
||||||
|
fun `network map will work after restart`() {
|
||||||
|
val dir = Paths.get("build", getTimestampAsDirectoryName())
|
||||||
|
// Start the network map.
|
||||||
|
driver(driverDirectory = dir) {
|
||||||
|
arrayOf(startNode("NodeA"), startNode("NodeB"), startNode("Notary")).forEach { it.get() }
|
||||||
|
}
|
||||||
|
// Start the network map second time, this will restore message queues from the journal.
|
||||||
|
// This will hang and fail prior the fix. https://github.com/corda/corda/issues/37
|
||||||
|
driver(driverDirectory = dir) {
|
||||||
|
arrayOf(startNode("NodeA"), startNode("NodeB"), startNode("Notary")).forEach { it.get(5, TimeUnit.MINUTES) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun getTimestampAsDirectoryName(): String {
|
||||||
|
return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(ZoneOffset.UTC).format(Instant.now())
|
||||||
|
}
|
||||||
|
}
|
@ -85,10 +85,14 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
|||||||
config.basedir.expectedOnDefaultFileSystem()
|
config.basedir.expectedOnDefaultFileSystem()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The server will make sure the bridge exists on network map changes, see method [destroyOrCreateBridge]
|
||||||
|
* We assume network map will be updated accordingly when the client node register with the network map server.
|
||||||
|
*/
|
||||||
fun start() = mutex.locked {
|
fun start() = mutex.locked {
|
||||||
if (!running) {
|
if (!running) {
|
||||||
configureAndStartServer()
|
configureAndStartServer()
|
||||||
networkChangeHandle = networkMapCache.changed.subscribe { destroyPossibleStaleBridge(it) }
|
networkChangeHandle = networkMapCache.changed.subscribe { destroyOrCreateBridge(it) }
|
||||||
running = true
|
running = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -108,13 +112,22 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
|||||||
maybeDeployBridgeForAddress(networkMapService)
|
maybeDeployBridgeForAddress(networkMapService)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun destroyPossibleStaleBridge(change: MapChange) {
|
/**
|
||||||
val staleNodeInfo = when (change) {
|
* The bridge will be created automatically when the queues are created, however, this is not the case when the network map restarted.
|
||||||
is MapChange.Modified -> change.previousNode
|
* The queues are restored from the journal, and because the queues are added before we register the callback handler, this method will never get called for existing queues.
|
||||||
is MapChange.Removed -> change.node
|
* This results in message queues up and never get send out. (https://github.com/corda/corda/issues/37)
|
||||||
is MapChange.Added -> return
|
*
|
||||||
|
* We create the bridges indirectly now because the network map is not persisted and there are no ways to obtain host and port information on startup.
|
||||||
|
* TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service.
|
||||||
|
*/
|
||||||
|
private fun destroyOrCreateBridge(change: MapChange) {
|
||||||
|
val (newNode, staleNode) = when (change) {
|
||||||
|
is MapChange.Modified -> change.node to change.previousNode
|
||||||
|
is MapChange.Removed -> null to change.node
|
||||||
|
is MapChange.Added -> change.node to null
|
||||||
}
|
}
|
||||||
(staleNodeInfo.address as? ArtemisAddress)?.let { maybeDestroyBridge(it.queueName) }
|
(staleNode?.address as? ArtemisAddress)?.let { maybeDestroyBridge(it.queueName) }
|
||||||
|
(newNode?.address as? ArtemisAddress)?.let { if (activeMQServer.queueQuery(it.queueName).isExists) maybeDeployBridgeForAddress(it) }
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun configureAndStartServer() {
|
private fun configureAndStartServer() {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user