Fixed bug where messages can't be sent to services running on the network map node (#71)

This commit is contained in:
Shams Asari 2016-12-20 12:01:52 +00:00
parent 6ad3ca48d3
commit 81d1459599
7 changed files with 179 additions and 76 deletions

View File

@ -27,10 +27,7 @@ import java.nio.file.*
import java.nio.file.attribute.FileAttribute
import java.time.Duration
import java.time.temporal.Temporal
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executor
import java.util.concurrent.Future
import java.util.concurrent.*
import java.util.concurrent.locks.ReentrantLock
import java.util.function.BiConsumer
import java.util.stream.Stream
@ -68,9 +65,9 @@ infix fun Long.checkedAdd(b: Long) = Math.addExact(this, b)
fun random63BitValue(): Long = Math.abs(newSecureRandom().nextLong())
/** Same as [Future.get] but with a more descriptive name, and doesn't throw [ExecutionException], instead throwing its cause */
fun <T> Future<T>.getOrThrow(): T {
try {
return get()
fun <T> Future<T>.getOrThrow(timeout: Duration? = null): T {
return try {
if (timeout == null) get() else get(timeout.toNanos(), TimeUnit.NANOSECONDS)
} catch (e: ExecutionException) {
throw e.cause!!
}

View File

@ -1,26 +0,0 @@
package net.corda.services.messaging
import net.corda.node.driver.driver
import net.corda.node.driver.getTimestampAsDirectoryName
import org.junit.Test
import java.nio.file.Paths
import java.time.Instant
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeUnit
class ArtemisMessagingServerTest {
@Test
fun `network map will work after restart`() {
val dir = Paths.get("build", getTimestampAsDirectoryName())
// Start the network map.
driver(driverDirectory = dir) {
arrayOf(startNode("NodeA"), startNode("NodeB"), startNode("Notary")).forEach { it.get() }
}
// Start the network map second time, this will restore message queues from the journal.
// This will hang and fail prior the fix. https://github.com/corda/corda/issues/37
driver(driverDirectory = dir) {
arrayOf(startNode("NodeA"), startNode("NodeB"), startNode("Notary")).forEach { it.get(5, TimeUnit.MINUTES) }
}
}
}

View File

@ -0,0 +1,101 @@
package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.flows.FlowLogic
import net.corda.core.future
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.seconds
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.testing.freeLocalHostAndPort
import net.corda.testing.node.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.util.*
class P2PMessagingTest : NodeBasedTest() {
@Test
fun `network map will work after restart`() {
fun startNodes() {
startNode("NodeA")
startNode("NodeB")
startNode("Notary")
}
startNodes()
// Start the network map second time, this will restore message queues from the journal.
// This will hang and fail prior the fix. https://github.com/corda/corda/issues/37
stopAllNodes()
future {
startNodes()
}.getOrThrow(30.seconds)
}
// https://github.com/corda/corda/issues/71
@Test
fun `sending message to a service running on the network map node`() {
startNetworkMapNode(advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
networkMapNode.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow(it, "Hello") }
val serviceParty = networkMapNode.services.networkMapCache.getAnyNotary()!!
val received = startNode("Alice").services.startFlow(ReceiveFlow(serviceParty)).resultFuture.getOrThrow(10.seconds)
assertThat(received).isEqualTo("Hello")
}
@Test
fun `sending message to a distributed service which the network map node is part of`() {
val serviceName = "DistributedService"
val root = tempFolder.root.toPath()
ServiceIdentityGenerator.generateToDisk(
listOf(
root / "NetworkMap",
root / "Alice"),
RaftValidatingNotaryService.type.id,
serviceName)
val distributedService = ServiceInfo(RaftValidatingNotaryService.type, serviceName)
val notaryClusterAddress = freeLocalHostAndPort()
startNetworkMapNode(
"NetworkMap",
advertisedServices = setOf(distributedService),
configOverrides = mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
val alice = startNode(
"Alice",
advertisedServices = setOf(distributedService),
configOverrides = mapOf(
"notaryNodeAddress" to freeLocalHostAndPort().toString(),
"notaryClusterAddresses" to listOf(notaryClusterAddress.toString())))
val bob = startNode("Bob")
// Setup each node in the distributed service to return back it's Party so that we can know which node is being used
val serviceNodes = listOf(networkMapNode, alice)
serviceNodes.forEach { node ->
node.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow(it, node.info.legalIdentity) }
}
val serviceParty = networkMapNode.services.networkMapCache.getNotary(serviceName)!!
val participatingParties = HashSet<Any>()
// Try up to 4 times so that we can be fairly sure that any node not participating is not due to Artemis' selection strategy
for (it in 1..5) {
participatingParties += bob.services.startFlow(ReceiveFlow(serviceParty)).resultFuture.getOrThrow(10.seconds)
if (participatingParties.size == 2) {
break
}
}
assertThat(participatingParties).containsOnlyElementsOf(serviceNodes.map { it.info.legalIdentity })
}
private class SendFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() = send(otherParty, payload)
}
private class ReceiveFlow(val otherParty: Party) : FlowLogic<Any>() {
@Suspendable
override fun call() = receive<Any>(otherParty).unwrap { it }
}
}

View File

@ -110,7 +110,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
if (!query.isExists) {
activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false)
}
maybeDeployBridgeForAddress(networkMapService)
deployBridgeIfAbsent(networkMapService.queueName, networkMapService.hostAndPort)
}
/**
@ -148,10 +148,10 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
}
(addressesToRemoveBridgesFrom - addressesToCreateBridgesTo).forEach {
maybeDestroyBridge(bridgeNameForAddress(it))
activeMQServer.destroyBridge(getBridgeName(it.queueName, it.hostAndPort))
}
addressesToCreateBridgesTo.forEach {
if (activeMQServer.queueQuery(it.queueName).isExists) maybeDeployBridgeForAddress(it)
addressesToCreateBridgesTo.filter { activeMQServer.queueQuery(it.queueName).isExists }.forEach {
deployBridgeIfAbsent(it.queueName, it.hostAndPort)
}
}
@ -171,12 +171,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
}
private fun maybeDeployBridgeForNode(queueName: SimpleString, nodeInfo: NodeInfo) {
log.debug("Deploying bridge for $queueName to $nodeInfo")
val address = nodeInfo.address
if (address is NodeAddress) {
maybeDeployBridgeForAddress(NodeAddress(queueName, address.hostAndPort))
if (address is ArtemisPeerAddress) {
log.debug("Deploying bridge for $queueName to $nodeInfo")
deployBridgeIfAbsent(queueName, address.hostAndPort)
} else {
log.error("Don't know how to deal with $address")
log.error("Don't know how to deal with $address for queue $queueName")
}
}
@ -284,17 +284,17 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private fun bridgeExists(name: String) = activeMQServer.clusterManager.bridges.containsKey(name)
private fun maybeDeployBridgeForAddress(peerAddress: ArtemisPeerAddress) {
if (!connectorExists(peerAddress.hostAndPort)) {
addConnector(peerAddress.hostAndPort)
private fun deployBridgeIfAbsent(queueName: SimpleString, hostAndPort: HostAndPort) {
if (!connectorExists(hostAndPort)) {
addConnector(hostAndPort)
}
val bridgeName = bridgeNameForAddress(peerAddress)
val bridgeName = getBridgeName(queueName, hostAndPort)
if (!bridgeExists(bridgeName)) {
deployBridge(bridgeName, peerAddress)
deployBridge(bridgeName, queueName, hostAndPort)
}
}
private fun bridgeNameForAddress(peerAddress: ArtemisPeerAddress) = "${peerAddress.queueName}-${peerAddress.hostAndPort}"
private fun getBridgeName(queueName: SimpleString, hostAndPort: HostAndPort) = "$queueName -> $hostAndPort"
/**
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
@ -302,12 +302,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
* P2P address.
*/
private fun deployBridge(bridgeName: String, peerAddress: ArtemisPeerAddress) {
private fun deployBridge(bridgeName: String, queueName: SimpleString, hostAndPort: HostAndPort) {
activeMQServer.deployBridge(BridgeConfiguration().apply {
name = bridgeName
queueName = peerAddress.queueName.toString()
this.queueName = queueName.toString()
forwardingAddress = P2P_QUEUE
staticConnectors = listOf(peerAddress.hostAndPort.toString())
staticConnectors = listOf(hostAndPort.toString())
confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
// As a peer of the target node we must connect to it using the peer user. Actual authentication is done using
@ -317,12 +317,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
})
}
private fun maybeDestroyBridge(name: String) {
if (bridgeExists(name)) {
activeMQServer.destroyBridge(name)
}
}
/**
* Clients must connect to us with a username and password and must use TLS. If a someone connects with
* [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate

View File

@ -379,7 +379,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString()))
}
log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}")
log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} " +
"uuid: ${message.uniqueMessageId}")
producer!!.send(mqAddress, artemisMessage)
}
}
@ -391,6 +392,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
} else {
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
// broker's job to route the message to the target's P2P queue.
// TODO Make sure that if target is a service that we're part of and the broker routes the message back to us
// it doesn't cause any issues.
val internalTargetQueue = (target as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address")
createQueueIfAbsent(internalTargetQueue)
internalTargetQueue

View File

@ -42,6 +42,7 @@ import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertNull
//TODO This needs to be merged into P2PMessagingTest as that creates a more realistic environment
class ArtemisMessagingTests {
@Rule @JvmField val temporaryFolder = TemporaryFolder()

View File

@ -1,13 +1,15 @@
package net.corda.testing.node
import net.corda.core.createDirectories
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.node.internal.Node
import net.corda.node.services.User
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.testing.freeLocalHostAndPort
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.rules.TemporaryFolder
import java.util.*
@ -17,43 +19,74 @@ import kotlin.concurrent.thread
* Extend this class if you need to run nodes in a test. You could use the driver DSL but it's extremely slow for testing
* purposes.
*/
// TODO Some of the logic here duplicates what's in the driver
abstract class NodeBasedTest {
@Rule
@JvmField
val tempFolder = TemporaryFolder()
private val nodes = ArrayList<Node>()
lateinit var networkMapNode: Node
private var _networkMapNode: Node? = null
@Before
fun startNetworkMapNode() {
networkMapNode = startNode("Network Map", emptyMap())
}
val networkMapNode: Node get() = _networkMapNode ?: startNetworkMapNode()
/**
* Stops the network map node and all the nodes started by [startNode]. This is called automatically after each test
* but can also be called manually within a test.
*/
@After
fun stopNodes() {
fun stopAllNodes() {
nodes.forEach(Node::stop)
nodes.clear()
_networkMapNode = null
}
fun startNode(legalName: String, rpcUsers: List<User> = emptyList()): Node {
return startNode(legalName, mapOf(
"networkMapAddress" to networkMapNode.configuration.artemisAddress.toString(),
"rpcUsers" to rpcUsers.map { mapOf(
"user" to it.username,
"password" to it.password,
"permissions" to it.permissions)
}
))
/**
* You can use this method to start the network map node in a more customised manner. Otherwise it
* will automatically be started with the default parameters.
*/
fun startNetworkMapNode(legalName: String = "Network Map",
advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): Node {
check(_networkMapNode == null)
return startNodeInternal(legalName, advertisedServices, rpcUsers, configOverrides).apply {
_networkMapNode = this
}
}
private fun startNode(legalName: String, configOverrides: Map<String, Any>): Node {
fun startNode(legalName: String,
advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): Node {
return startNodeInternal(
legalName,
advertisedServices,
rpcUsers,
configOverrides + mapOf(
"networkMapAddress" to networkMapNode.configuration.artemisAddress.toString()
)
)
}
private fun startNodeInternal(legalName: String,
advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>,
configOverrides: Map<String, Any>): Node {
val config = ConfigHelper.loadConfig(
baseDirectoryPath = tempFolder.newFolder(legalName).toPath(),
baseDirectoryPath = (tempFolder.root.toPath() / legalName).createDirectories(),
allowMissingConfig = true,
configOverrides = configOverrides + mapOf(
"myLegalName" to legalName,
"artemisAddress" to freeLocalHostAndPort().toString(),
"extraAdvertisedServiceIds" to ""
"extraAdvertisedServiceIds" to advertisedServices.joinToString(","),
"rpcUsers" to rpcUsers.map {
mapOf(
"user" to it.username,
"password" to it.password,
"permissions" to it.permissions
)
}
)
)