mirror of
https://github.com/corda/corda.git
synced 2025-06-18 07:08:15 +00:00
Added a method to NodeHandle to simplify using RPC in the Driver
This commit is contained in:
@ -53,7 +53,7 @@ class DriverTests {
|
||||
|
||||
@Test
|
||||
fun randomFreePortAllocationWorks() {
|
||||
val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) {
|
||||
val nodeInfo = driver(portAllocation = PortAllocation.RandomFree) {
|
||||
val nodeInfo = startNode("NoService")
|
||||
nodeMustBeUp(nodeInfo.getOrThrow().nodeInfo)
|
||||
nodeInfo.getOrThrow()
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.node.services
|
||||
|
||||
import net.corda.core.bufferUntilSubscribed
|
||||
import net.corda.core.contracts.Amount
|
||||
import net.corda.core.contracts.POUNDS
|
||||
import net.corda.core.contracts.issuedBy
|
||||
import net.corda.core.crypto.Party
|
||||
@ -15,9 +16,6 @@ import net.corda.flows.CashFlowResult
|
||||
import net.corda.node.driver.DriverBasedTest
|
||||
import net.corda.node.driver.NodeHandle
|
||||
import net.corda.node.driver.driver
|
||||
import net.corda.node.services.config.configureTestSSL
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent
|
||||
import net.corda.node.services.messaging.CordaRPCClient
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.testing.expect
|
||||
import net.corda.testing.expectEvents
|
||||
@ -28,14 +26,14 @@ import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class DistributedServiceTests : DriverBasedTest() {
|
||||
lateinit var alice: NodeInfo
|
||||
lateinit var alice: NodeHandle
|
||||
lateinit var notaries: List<NodeHandle>
|
||||
lateinit var aliceProxy: CordaRPCOps
|
||||
lateinit var raftNotaryIdentity: Party
|
||||
lateinit var notaryStateMachines: Observable<Pair<NodeInfo, StateMachineUpdate>>
|
||||
|
||||
override fun setup() = driver {
|
||||
// Start Alice and 3 raft notaries
|
||||
// Start Alice and 3 notaries in a RAFT cluster
|
||||
val clusterSize = 3
|
||||
val testUser = User("test", "test", permissions = setOf(startFlowPermission<CashFlow>()))
|
||||
val aliceFuture = startNode("Alice", rpcUsers = listOf(testUser))
|
||||
@ -46,7 +44,7 @@ class DistributedServiceTests : DriverBasedTest() {
|
||||
type = RaftValidatingNotaryService.type
|
||||
)
|
||||
|
||||
alice = aliceFuture.get().nodeInfo
|
||||
alice = aliceFuture.get()
|
||||
val (notaryIdentity, notaryNodes) = notariesFuture.get()
|
||||
raftNotaryIdentity = notaryIdentity
|
||||
notaries = notaryNodes
|
||||
@ -55,14 +53,14 @@ class DistributedServiceTests : DriverBasedTest() {
|
||||
assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size)
|
||||
|
||||
// Connect to Alice and the notaries
|
||||
fun connectRpc(node: NodeInfo): CordaRPCOps {
|
||||
val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(node.address), configureTestSSL())
|
||||
fun connectRpc(node: NodeHandle): CordaRPCOps {
|
||||
val client = node.rpcClientToNode()
|
||||
client.start("test", "test")
|
||||
return client.proxy()
|
||||
}
|
||||
aliceProxy = connectRpc(alice)
|
||||
val notaryProxies = notaries.map { connectRpc(it.nodeInfo) }
|
||||
notaryStateMachines = Observable.from(notaryProxies.map { proxy ->
|
||||
val rpcClientsToNotaries = notaries.map(::connectRpc)
|
||||
notaryStateMachines = Observable.from(rpcClientsToNotaries.map { proxy ->
|
||||
proxy.stateMachinesAndUpdates().second.map { Pair(proxy.nodeIdentity(), it) }
|
||||
}).flatMap { it.onErrorResumeNext(Observable.empty()) }.bufferUntilSubscribed()
|
||||
|
||||
@ -74,11 +72,10 @@ class DistributedServiceTests : DriverBasedTest() {
|
||||
@Test
|
||||
fun `requests are distributed evenly amongst the nodes`() {
|
||||
// Issue 100 pounds, then pay ourselves 50x2 pounds
|
||||
val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity))
|
||||
require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
|
||||
issueCash(100.POUNDS)
|
||||
|
||||
for (i in 1..50) {
|
||||
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(2.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
|
||||
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
|
||||
paySelf(2.POUNDS)
|
||||
}
|
||||
|
||||
// The state machines added in the notaries should map one-to-one to notarisation requests
|
||||
@ -104,11 +101,10 @@ class DistributedServiceTests : DriverBasedTest() {
|
||||
@Test
|
||||
fun `cluster survives if a notary is killed`() {
|
||||
// Issue 100 pounds, then pay ourselves 10x5 pounds
|
||||
val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity))
|
||||
require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
|
||||
issueCash(100.POUNDS)
|
||||
|
||||
for (i in 1..10) {
|
||||
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
|
||||
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
|
||||
paySelf(5.POUNDS)
|
||||
}
|
||||
|
||||
// Now kill a notary
|
||||
@ -119,8 +115,7 @@ class DistributedServiceTests : DriverBasedTest() {
|
||||
|
||||
// Pay ourselves another 20x5 pounds
|
||||
for (i in 1..20) {
|
||||
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
|
||||
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
|
||||
paySelf(5.POUNDS)
|
||||
}
|
||||
|
||||
val notarisationsPerNotary = HashMap<Party, Int>()
|
||||
@ -137,4 +132,18 @@ class DistributedServiceTests : DriverBasedTest() {
|
||||
println("Notarisation distribution: $notarisationsPerNotary")
|
||||
require(notarisationsPerNotary.size == 3)
|
||||
}
|
||||
|
||||
private fun issueCash(amount: Amount<Currency>) {
|
||||
val issueHandle = aliceProxy.startFlow(
|
||||
::CashFlow,
|
||||
CashCommand.IssueCash(amount, OpaqueBytes.of(0), alice.nodeInfo.legalIdentity, raftNotaryIdentity))
|
||||
require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
|
||||
}
|
||||
|
||||
private fun paySelf(amount: Amount<Currency>) {
|
||||
val payHandle = aliceProxy.startFlow(
|
||||
::CashFlow,
|
||||
CashCommand.PayCash(amount.issuedBy(alice.nodeInfo.legalIdentity.ref(0)), alice.nodeInfo.legalIdentity))
|
||||
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
|
||||
}
|
||||
}
|
@ -19,6 +19,7 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.config.ConfigHelper
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.services.messaging.CordaRPCClient
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
@ -93,9 +94,11 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface {
|
||||
|
||||
data class NodeHandle(
|
||||
val nodeInfo: NodeInfo,
|
||||
val config: Config,
|
||||
val configuration: FullNodeConfiguration,
|
||||
val process: Process
|
||||
)
|
||||
) {
|
||||
fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.artemisAddress, configuration)
|
||||
}
|
||||
|
||||
sealed class PortAllocation {
|
||||
abstract fun nextPort(): Int
|
||||
@ -106,7 +109,7 @@ sealed class PortAllocation {
|
||||
override fun nextPort() = portCounter.andIncrement
|
||||
}
|
||||
|
||||
class RandomFree() : PortAllocation() {
|
||||
object RandomFree : PortAllocation() {
|
||||
override fun nextPort(): Int {
|
||||
return ServerSocket().use {
|
||||
it.bind(InetSocketAddress(0))
|
||||
@ -364,16 +367,16 @@ open class DriverDSL(
|
||||
}
|
||||
) + customOverrides
|
||||
|
||||
val config = ConfigHelper.loadConfig(
|
||||
val configuration = FullNodeConfiguration(ConfigHelper.loadConfig(
|
||||
baseDirectoryPath = baseDirectory,
|
||||
allowMissingConfig = true,
|
||||
configOverrides = configOverrides
|
||||
)
|
||||
))
|
||||
|
||||
val startNode = startNode(executorService, FullNodeConfiguration(config), quasarJarPath, debugPort)
|
||||
val startNode = startNode(executorService, configuration, quasarJarPath, debugPort)
|
||||
registerProcess(startNode)
|
||||
return startNode.map {
|
||||
NodeHandle(queryNodeInfo(apiAddress)!!, config, it)
|
||||
NodeHandle(queryNodeInfo(apiAddress)!!, configuration, it)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,6 +61,8 @@ class FullNodeConfiguration(val config: Config) : NodeConfiguration {
|
||||
val useHTTPS: Boolean by config
|
||||
val artemisAddress: HostAndPort by config
|
||||
val webAddress: HostAndPort by config
|
||||
// TODO This field is slightly redundant as artemisAddress is sufficient to hold the address of the node's MQ broker.
|
||||
// Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one
|
||||
val messagingServerAddress: HostAndPort? by config.getOrElse { null }
|
||||
val extraAdvertisedServiceIds: String by config
|
||||
val useTestClock: Boolean by config.getOrElse { false }
|
||||
|
Reference in New Issue
Block a user