Add isRegisteredWithNetworkMap RPC, poll in Driver, expose RPC proxy

This commit is contained in:
Andras Slemmer 2017-02-02 10:54:50 +00:00
parent 3f6c8ab1e2
commit de63f90745
4 changed files with 32 additions and 15 deletions

View File

@ -124,7 +124,7 @@ class NodeMonitorModelTest : DriverBasedTest() {
issueRef = OpaqueBytes(ByteArray(1, { 1 })), issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.legalIdentity, recipient = aliceNode.legalIdentity,
notary = notaryNode.notaryIdentity notary = notaryNode.notaryIdentity
)) )).returnValue.toBlocking().first()
rpc.startFlow(::CashFlow, CashCommand.PayCash( rpc.startFlow(::CashFlow, CashCommand.PayCash(
amount = Amount(100, Issued(PartyAndReference(aliceNode.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), amount = Amount(100, Issued(PartyAndReference(aliceNode.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),

View File

@ -111,6 +111,11 @@ interface CordaRPCOps : RPCOps {
*/ */
fun currentNodeTime(): Instant fun currentNodeTime(): Instant
/**
* Returns whether the node has registered with the network map.
*/
fun isRegisteredWithNetworkMap(): Boolean
// TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of // TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of
// the node's state locally and query that directly. // the node's state locally and query that directly.
/** /**

View File

@ -10,6 +10,7 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions import com.typesafe.config.ConfigRenderOptions
import net.corda.core.* import net.corda.core.*
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType import net.corda.core.node.services.ServiceType
@ -31,11 +32,9 @@ import java.io.File
import java.net.* import java.net.*
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
import java.time.Duration
import java.time.Instant import java.time.Instant
import java.time.ZoneOffset.UTC import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.* import java.util.*
import java.util.concurrent.* import java.util.concurrent.*
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
@ -102,6 +101,7 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface {
data class NodeHandle( data class NodeHandle(
val nodeInfo: NodeInfo, val nodeInfo: NodeInfo,
val rpc: CordaRPCOps,
val configuration: FullNodeConfiguration, val configuration: FullNodeConfiguration,
val process: Process val process: Process
) { ) {
@ -327,14 +327,16 @@ open class DriverDSL(
executorService.shutdown() executorService.shutdown()
} }
private fun queryNodeInfo(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): NodeInfo? { private fun establishRpc(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): ListenableFuture<CordaRPCOps> {
while (true) try { val client = CordaRPCClient(nodeAddress, sslConfig)
val client = CordaRPCClient(nodeAddress, sslConfig) return poll(executorService, "for RPC connection") {
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER) try {
val rpcOps = client.proxy(timeout = Duration.of(15, ChronoUnit.SECONDS)) client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
return rpcOps.nodeIdentity() return@poll client.proxy()
} catch(e: Exception) { } catch(e: Exception) {
log.error("Retrying query node info at $nodeAddress") log.error("Retrying query node info at $nodeAddress")
null
}
} }
} }
@ -374,10 +376,18 @@ open class DriverDSL(
) )
) )
val startNode = startNode(executorService, configuration, quasarJarPath, debugPort) val processFuture = startNode(executorService, configuration, quasarJarPath, debugPort)
registerProcess(startNode) registerProcess(processFuture)
return startNode.map { return processFuture.flatMap { process ->
NodeHandle(queryNodeInfo(messagingAddress, configuration)!!, configuration, it) establishRpc(messagingAddress, configuration).flatMap { rpc ->
poll(executorService, "$name to register with the network map") {
if (rpc.isRegisteredWithNetworkMap()) {
NodeHandle(rpc.nodeIdentity(), rpc, configuration, process)
} else {
null
}
}
}
} }
} }

View File

@ -111,6 +111,8 @@ class CordaRPCOpsImpl(
} }
} }
override fun isRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered.isDone
override fun partyFromKey(key: CompositeKey) = services.identityService.partyFromKey(key) override fun partyFromKey(key: CompositeKey) = services.identityService.partyFromKey(key)
override fun partyFromName(name: String) = services.identityService.partyFromName(name) override fun partyFromName(name: String) = services.identityService.partyFromName(name)