Testing that a node can communicate with a distributed service it's part of

This commit is contained in:
Shams Asari 2016-12-29 16:46:14 +00:00
parent e55833d147
commit 334b91faf0
9 changed files with 136 additions and 122 deletions

View File

@ -202,6 +202,13 @@ fun <T> List<T>.randomOrNull(predicate: (T) -> Boolean) = filter(predicate).rand
// An alias that can sometimes make code clearer to read.
val RunOnCallerThread: Executor = MoreExecutors.directExecutor()
inline fun elapsedTime(block: () -> Unit): Duration {
val start = System.nanoTime()
block()
val end = System.nanoTime()
return Duration.ofNanos(end-start)
}
// TODO: Add inline back when a new Kotlin version is released and check if the java.lang.VerifyError
// returns in the IRSSimulationTest. If not, commit the inline back.
fun <T> logElapsedTime(label: String, logger: Logger? = null, body: () -> T): T {
@ -274,19 +281,17 @@ class TransientProperty<out T>(private val initializer: () -> T) {
/**
* Given a path to a zip file, extracts it to the given directory.
*/
fun extractZipFile(zipPath: Path, toPath: Path) {
val normalisedToPath = toPath.normalize()
normalisedToPath.createDirectories()
fun extractZipFile(zipFile: Path, toDirectory: Path) {
val normalisedDirectory = toDirectory.normalize().createDirectories()
zipPath.read {
zipFile.read {
val zip = ZipInputStream(BufferedInputStream(it))
while (true) {
val e = zip.nextEntry ?: break
val outPath = normalisedToPath / e.name
val outPath = (normalisedDirectory / e.name).normalize()
// Security checks: we should reject a zip that contains tricksy paths that try to escape toPath.
if (!outPath.normalize().startsWith(normalisedToPath))
throw IllegalStateException("ZIP contained a path that resolved incorrectly: ${e.name}")
// Security checks: we should reject a zip that contains tricksy paths that try to escape toDirectory.
check(outPath.startsWith(normalisedDirectory)) { "ZIP contained a path that resolved incorrectly: ${e.name}" }
if (e.isDirectory) {
outPath.createDirectories()

View File

@ -1,10 +1,7 @@
package net.corda.flows
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.messaging.MessagingService
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.onNext
import net.corda.core.messaging.send
import net.corda.core.messaging.*
import net.corda.core.node.services.DEFAULT_SESSION_ID
/**
@ -21,7 +18,7 @@ interface ServiceRequestMessage {
*/
fun <R : Any> MessagingService.sendRequest(topic: String,
request: ServiceRequestMessage,
target: SingleMessageRecipient): ListenableFuture<R> {
target: MessageRecipients): ListenableFuture<R> {
val responseFuture = onNext<R>(topic, request.sessionID)
send(topic, DEFAULT_SESSION_ID, request, target)
return responseFuture

View File

@ -1,25 +1,18 @@
package net.corda.node.services
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.DummyContract
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionType
import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.flatMap
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.map
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import net.corda.flows.NotaryFlow
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.Node
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.getFreeLocalPorts
import net.corda.testing.node.NodeBasedTest
import org.junit.Test
import java.security.KeyPair
@ -29,11 +22,13 @@ import kotlin.test.assertFailsWith
class RaftNotaryServiceTests : NodeBasedTest() {
private val notaryName = "RAFT Notary Service"
private val clusterSize = 3
@Test
fun `detect double spend`() {
val (masterNode, alice) = Futures.allAsList(createNotaryCluster(), startNode("Alice")).getOrThrow()
val (masterNode, alice) = Futures.allAsList(
startNotaryCluster(notaryName, 3).map { it.first() },
startNode("Alice")
).getOrThrow()
val notaryParty = alice.netMapCache.getNotary(notaryName)!!
val notaryNodeKeyPair = databaseTransaction(masterNode.database) { masterNode.services.notaryIdentityKey }
@ -61,31 +56,6 @@ class RaftNotaryServiceTests : NodeBasedTest() {
assertEquals(error.tx, secondSpendTx.tx)
}
private fun createNotaryCluster(): ListenableFuture<Node> {
val notaryService = ServiceInfo(RaftValidatingNotaryService.type, notaryName)
val notaryAddresses = getFreeLocalPorts("localhost", clusterSize).map { it.toString() }
ServiceIdentityGenerator.generateToDisk(
(0 until clusterSize).map { tempFolder.root.toPath() / "Notary$it" },
notaryService.type.id,
notaryName)
val masterNode = startNode(
"Notary0",
advertisedServices = setOf(notaryService),
configOverrides = mapOf("notaryNodeAddress" to notaryAddresses[0]))
val remainingNodes = (1 until clusterSize).map {
startNode(
"Notary$it",
advertisedServices = setOf(notaryService),
configOverrides = mapOf(
"notaryNodeAddress" to notaryAddresses[it],
"notaryClusterAddresses" to listOf(notaryAddresses[0])))
}
return Futures.allAsList(remainingNodes).flatMap { masterNode }
}
private fun issueState(node: AbstractNode, notary: Party, notaryKey: KeyPair): StateAndRef<*> {
return databaseTransaction(node.database) {
val tx = DummyContract.generateInitial(node.info.legalIdentity.ref(0), Random().nextInt(), notary)

View File

@ -1,14 +1,18 @@
package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.Futures
import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.flows.FlowLogic
import net.corda.core.future
import net.corda.core.getOrThrow
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.*
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.createMessage
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.ServiceInfo
import net.corda.core.seconds
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.flows.ServiceRequestMessage
import net.corda.flows.sendRequest
import net.corda.node.internal.Node
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.utilities.ServiceIdentityGenerator
@ -21,39 +25,36 @@ import java.util.*
class P2PMessagingTest : NodeBasedTest() {
@Test
fun `network map will work after restart`() {
fun startNodes() {
Futures.allAsList(startNode("NodeA"), startNode("NodeB"), startNode("Notary")).getOrThrow()
}
fun startNodes() = Futures.allAsList(startNode("NodeA"), startNode("NodeB"), startNode("Notary"))
startNodes()
// Start the network map second time, this will restore message queues from the journal.
val startUpDuration = elapsedTime { startNodes().getOrThrow() }
// Start the network map a second time - this will restore message queues from the journal.
// This will hang and fail prior the fix. https://github.com/corda/corda/issues/37
stopAllNodes()
future {
startNodes()
}.getOrThrow(30.seconds)
startNodes().getOrThrow(timeout = startUpDuration.multipliedBy(3))
}
// https://github.com/corda/corda/issues/71
@Test
fun `sending message to a service running on the network map node`() {
fun `communicating with a service running on the network map node`() {
startNetworkMapNode(advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
networkMapNode.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow(it, "Hello") }
val serviceParty = networkMapNode.services.networkMapCache.getAnyNotary()!!
networkMapNode.respondWith("Hello")
val alice = startNode("Alice").getOrThrow()
val received = alice.services.startFlow(ReceiveFlow(serviceParty)).resultFuture.getOrThrow(10.seconds)
val serviceAddress = alice.services.networkMapCache.run {
alice.net.getAddressOfParty(getPartyInfo(getAnyNotary()!!)!!)
}
val received = alice.receiveFrom(serviceAddress).getOrThrow(10.seconds)
assertThat(received).isEqualTo("Hello")
}
// TODO Use a dummy distributed service
@Test
fun `sending message to a distributed service which the network map node is part of`() {
fun `communicating with a distributed service which the network map node is part of`() {
val serviceName = "DistributedService"
val root = tempFolder.root.toPath()
ServiceIdentityGenerator.generateToDisk(
listOf(
root / "NetworkMap",
root / "Alice"),
listOf(root / "NetworkMap", root / "Service Node 2"),
RaftValidatingNotaryService.type.id,
serviceName)
@ -63,41 +64,60 @@ class P2PMessagingTest : NodeBasedTest() {
"NetworkMap",
advertisedServices = setOf(distributedService),
configOverrides = mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
val (alice, bob) = Futures.allAsList(
startNode(
"Alice",
advertisedServices = setOf(distributedService),
configOverrides = mapOf(
"notaryNodeAddress" to freeLocalHostAndPort().toString(),
"notaryClusterAddresses" to listOf(notaryClusterAddress.toString()))),
startNode("Bob")
val (serviceNode2, alice) = Futures.allAsList(
startNode(
"Service Node 2",
advertisedServices = setOf(distributedService),
configOverrides = mapOf(
"notaryNodeAddress" to freeLocalHostAndPort().toString(),
"notaryClusterAddresses" to listOf(notaryClusterAddress.toString()))),
startNode("Alice")
).getOrThrow()
// Setup each node in the distributed service to return back it's Party so that we can know which node is being used
val serviceNodes = listOf(networkMapNode, alice)
serviceNodes.forEach { node ->
node.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow(it, node.info.legalIdentity) }
}
assertAllNodesAreUsed(listOf(networkMapNode, serviceNode2), serviceName, alice)
}
val serviceParty = networkMapNode.services.networkMapCache.getNotary(serviceName)!!
val participatingParties = HashSet<Any>()
// Try up to 4 times so that we can be fairly sure that any node not participating is not due to Artemis' selection strategy
for (it in 1..5) {
participatingParties += bob.services.startFlow(ReceiveFlow(serviceParty)).resultFuture.getOrThrow(10.seconds)
if (participatingParties.size == 2) {
@Test
fun `communicating with a distributed service which we're part of`() {
val serviceName = "Distributed Service"
val distributedService = startNotaryCluster(serviceName, 2).getOrThrow()
assertAllNodesAreUsed(distributedService, serviceName, distributedService[0])
}
private fun assertAllNodesAreUsed(participatingServiceNodes: List<Node>, serviceName: String, originatingNode: Node) {
// Setup each node in the distributed service to return back it's NodeInfo so that we can know which node is being used
participatingServiceNodes.forEach { node ->
node.respondWith(node.info)
}
val serviceAddress = originatingNode.services.networkMapCache.run {
originatingNode.net.getAddressOfParty(getPartyInfo(getNotary(serviceName)!!)!!)
}
val participatingNodes = HashSet<Any>()
// Try several times so that we can be fairly sure that any node not participating is not due to Artemis' selection
// strategy. 3 attempts for each node seems to be sufficient.
// This is not testing the distribution of the requests - DistributedServiceTests already does that
for (it in 1..participatingServiceNodes.size * 3) {
participatingNodes += originatingNode.receiveFrom(serviceAddress).getOrThrow(10.seconds)
if (participatingNodes.size == participatingServiceNodes.size) {
break
}
}
assertThat(participatingParties).containsOnlyElementsOf(serviceNodes.map { it.info.legalIdentity })
assertThat(participatingNodes).containsOnlyElementsOf(participatingServiceNodes.map(Node::info))
}
private class SendFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() = send(otherParty, payload)
private fun Node.respondWith(message: Any) {
net.addMessageHandler(javaClass.name, DEFAULT_SESSION_ID) { netMessage, reg ->
val request = netMessage.data.deserialize<TestRequest>()
val response = net.createMessage(javaClass.name, request.sessionID, message.serialize().bytes)
net.send(response, request.replyTo)
}
}
private class ReceiveFlow(val otherParty: Party) : FlowLogic<Any>() {
@Suspendable
override fun call() = receive<Any>(otherParty).unwrap { it }
private fun Node.receiveFrom(target: MessageRecipients): ListenableFuture<Any> {
val request = TestRequest(replyTo = net.myAddress)
return net.sendRequest<Any>(javaClass.name, request, target)
}
private data class TestRequest(override val sessionID: Long = random63BitValue(),
override val replyTo: SingleMessageRecipient) : ServiceRequestMessage
}

View File

@ -2,12 +2,9 @@ package net.corda.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import kotlinx.support.jdk7.use
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.node.NodeInfo
import net.corda.core.random63BitValue
import net.corda.core.seconds
import net.corda.flows.sendRequest
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.config.configureWithDevSSLCertificate

View File

@ -377,8 +377,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
} else {
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
// broker's job to route the message to the target's P2P queue.
// TODO Make sure that if target is a service that we're part of and the broker routes the message back to us
// it doesn't cause any issues.
val internalTargetQueue = (target as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address")
createQueueIfAbsent(internalTargetQueue)
internalTargetQueue

View File

@ -1,4 +1,4 @@
package net.corda.node.services
package net.corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
@ -12,11 +12,11 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.createMessage
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.utilities.LogHelper
import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.PersistentUniquenessProvider
@ -163,8 +163,7 @@ class ArtemisMessagingTests {
fun `client should be able to send large numbers of messages to itself before network map is available and survive restart, then receive messages`() {
// Crank the iteration up as high as you want... just takes longer to run.
val iterations = 100
val settableFuture: SettableFuture<Unit> = SettableFuture.create()
networkMapRegistrationFuture = settableFuture
networkMapRegistrationFuture = SettableFuture.create()
val receivedMessages = LinkedBlockingQueue<Message>()

View File

@ -26,7 +26,6 @@ import rx.Subscriber
import java.net.ServerSocket
import java.nio.file.Path
import java.security.KeyPair
import java.time.Duration
import java.util.*
import kotlin.reflect.KClass
@ -164,13 +163,6 @@ inline fun <reified P : FlowLogic<*>> AbstractNode.initiateSingleShotFlow(
return future
}
inline fun elapsedTime(block: () -> Unit): Duration {
val start = System.nanoTime()
block()
val end = System.nanoTime()
return Duration.ofNanos(end-start)
}
data class TestNodeConfiguration(
override val basedir: Path,
override val myLegalName: String,

View File

@ -1,16 +1,21 @@
package net.corda.testing.node
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.createDirectories
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.flatMap
import net.corda.core.map
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.node.internal.Node
import net.corda.node.services.User
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.testing.freeLocalHostAndPort
import net.corda.testing.getFreeLocalPorts
import org.junit.After
import org.junit.Rule
import org.junit.rules.TemporaryFolder
@ -19,7 +24,7 @@ import kotlin.concurrent.thread
/**
* Extend this class if you need to run nodes in a test. You could use the driver DSL but it's extremely slow for testing
* purposes. Use the DSL if you need to run the nodes in separate processes otherwise this class will suffice.
* purposes. Use the driver if you need to run the nodes in separate processes otherwise this class will suffice.
*/
// TODO Some of the logic here duplicates what's in the driver
abstract class NodeBasedTest {
@ -52,7 +57,7 @@ abstract class NodeBasedTest {
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): Node {
check(_networkMapNode == null)
return startNodeInternal(legalName, advertisedServices, rpcUsers, configOverrides).getOrThrow().apply {
return startNodeInternal(legalName, advertisedServices, rpcUsers, configOverrides).apply {
_networkMapNode = this
}
}
@ -61,7 +66,7 @@ abstract class NodeBasedTest {
advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): ListenableFuture<Node> {
return startNodeInternal(
val node = startNodeInternal(
legalName,
advertisedServices,
rpcUsers,
@ -72,12 +77,43 @@ abstract class NodeBasedTest {
)
) + configOverrides
)
return node.networkMapRegistrationFuture.map { node }
}
fun startNotaryCluster(notaryName: String,
clusterSize: Int,
serviceType: ServiceType = RaftValidatingNotaryService.type): ListenableFuture<List<Node>> {
ServiceIdentityGenerator.generateToDisk(
(0 until clusterSize).map { tempFolder.root.toPath() / "$notaryName-$it" },
serviceType.id,
notaryName)
val serviceInfo = ServiceInfo(serviceType, notaryName)
val nodeAddresses = getFreeLocalPorts("localhost", clusterSize).map { it.toString() }
val masterNodeFuture = startNode(
"$notaryName-0",
advertisedServices = setOf(serviceInfo),
configOverrides = mapOf("notaryNodeAddress" to nodeAddresses[0]))
val remainingNodesFutures = (1 until clusterSize).map {
startNode(
"$notaryName-$it",
advertisedServices = setOf(serviceInfo),
configOverrides = mapOf(
"notaryNodeAddress" to nodeAddresses[it],
"notaryClusterAddresses" to listOf(nodeAddresses[0])))
}
return Futures.allAsList(remainingNodesFutures).flatMap { remainingNodes ->
masterNodeFuture.map { masterNode -> listOf(masterNode) + remainingNodes }
}
}
private fun startNodeInternal(legalName: String,
advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>,
configOverrides: Map<String, Any>): ListenableFuture<Node> {
configOverrides: Map<String, Any>): Node {
val config = ConfigHelper.loadConfig(
baseDirectoryPath = (tempFolder.root.toPath() / legalName).createDirectories(),
allowMissingConfig = true,
@ -101,6 +137,6 @@ abstract class NodeBasedTest {
thread(name = legalName) {
node.run()
}
return node.networkMapRegistrationFuture.map { node }
return node
}
}