Merge pull request #207 from corda/aslemmer-driver-poll-nms-registration

Add isRegisteredWithNetworkMap RPC, poll in Driver, expose RPC proxy
This commit is contained in:
Andras Slemmer 2017-02-02 13:14:44 +00:00 committed by GitHub
commit 8b258b9415
4 changed files with 28 additions and 15 deletions

View File

@ -124,7 +124,7 @@ class NodeMonitorModelTest : DriverBasedTest() {
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.legalIdentity,
notary = notaryNode.notaryIdentity
))
)).returnValue.toBlocking().first()
rpc.startFlow(::CashFlow, CashCommand.PayCash(
amount = Amount(100, Issued(PartyAndReference(aliceNode.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),

View File

@ -111,6 +111,12 @@ interface CordaRPCOps : RPCOps {
*/
fun currentNodeTime(): Instant
/**
* Returns an Observable emitting a single Unit once the node is registered with the network map.
*/
@RPCReturnsObservables
fun waitUntilRegisteredWithNetworkMap(): Observable<Unit>
// TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of
// the node's state locally and query that directly.
/**

View File

@ -10,6 +10,7 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
@ -31,11 +32,9 @@ import java.io.File
import java.net.*
import java.nio.file.Path
import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.TimeUnit.MILLISECONDS
@ -102,6 +101,7 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface {
data class NodeHandle(
val nodeInfo: NodeInfo,
val rpc: CordaRPCOps,
val configuration: FullNodeConfiguration,
val process: Process
) {
@ -327,14 +327,16 @@ open class DriverDSL(
executorService.shutdown()
}
private fun queryNodeInfo(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): NodeInfo? {
while (true) try {
val client = CordaRPCClient(nodeAddress, sslConfig)
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
val rpcOps = client.proxy(timeout = Duration.of(15, ChronoUnit.SECONDS))
return rpcOps.nodeIdentity()
} catch(e: Exception) {
log.error("Retrying query node info at $nodeAddress")
private fun establishRpc(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): ListenableFuture<CordaRPCOps> {
val client = CordaRPCClient(nodeAddress, sslConfig)
return poll(executorService, "for RPC connection") {
try {
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
return@poll client.proxy()
} catch(e: Exception) {
log.error("Exception $e, Retrying RPC connection at $nodeAddress")
null
}
}
}
@ -374,10 +376,14 @@ open class DriverDSL(
)
)
val startNode = startNode(executorService, configuration, quasarJarPath, debugPort)
registerProcess(startNode)
return startNode.map {
NodeHandle(queryNodeInfo(messagingAddress, configuration)!!, configuration, it)
val processFuture = startNode(executorService, configuration, quasarJarPath, debugPort)
registerProcess(processFuture)
return processFuture.flatMap { process ->
establishRpc(messagingAddress, configuration).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().toFuture().map {
NodeHandle(rpc.nodeIdentity(), rpc, configuration, process)
}
}
}
}

View File

@ -111,6 +111,7 @@ class CordaRPCOpsImpl(
}
}
override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered.toObservable()
override fun partyFromKey(key: CompositeKey) = services.identityService.partyFromKey(key)
override fun partyFromName(name: String) = services.identityService.partyFromName(name)