From de63f90745781a56a1aeffbc9901c902cde15659 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 2 Feb 2017 10:54:50 +0000 Subject: [PATCH] Add isRegisteredWithNetworkMap RPC, poll in Driver, expose RPC proxy --- .../net/corda/client/NodeMonitorModelTest.kt | 2 +- .../net/corda/core/messaging/CordaRPCOps.kt | 5 +++ .../kotlin/net/corda/node/driver/Driver.kt | 38 ++++++++++++------- .../corda/node/internal/CordaRPCOpsImpl.kt | 2 + 4 files changed, 32 insertions(+), 15 deletions(-) diff --git a/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt b/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt index 4847dc943d..e0a8d6b1b3 100644 --- a/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt +++ b/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt @@ -124,7 +124,7 @@ class NodeMonitorModelTest : DriverBasedTest() { issueRef = OpaqueBytes(ByteArray(1, { 1 })), recipient = aliceNode.legalIdentity, notary = notaryNode.notaryIdentity - )) + )).returnValue.toBlocking().first() rpc.startFlow(::CashFlow, CashCommand.PayCash( amount = Amount(100, Issued(PartyAndReference(aliceNode.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 69eea24dc5..24c84275bf 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -111,6 +111,11 @@ interface CordaRPCOps : RPCOps { */ fun currentNodeTime(): Instant + /** + * Returns whether the node has registered with the network map. + */ + fun isRegisteredWithNetworkMap(): Boolean + // TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of // the node's state locally and query that directly. /** diff --git a/node/src/main/kotlin/net/corda/node/driver/Driver.kt b/node/src/main/kotlin/net/corda/node/driver/Driver.kt index c59f08b255..d57d8ff232 100644 --- a/node/src/main/kotlin/net/corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/net/corda/node/driver/Driver.kt @@ -10,6 +10,7 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigRenderOptions import net.corda.core.* import net.corda.core.crypto.Party +import net.corda.core.messaging.CordaRPCOps import net.corda.core.node.NodeInfo import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType @@ -31,11 +32,9 @@ import java.io.File import java.net.* import java.nio.file.Path import java.nio.file.Paths -import java.time.Duration import java.time.Instant import java.time.ZoneOffset.UTC import java.time.format.DateTimeFormatter -import java.time.temporal.ChronoUnit import java.util.* import java.util.concurrent.* import java.util.concurrent.TimeUnit.MILLISECONDS @@ -102,6 +101,7 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface { data class NodeHandle( val nodeInfo: NodeInfo, + val rpc: CordaRPCOps, val configuration: FullNodeConfiguration, val process: Process ) { @@ -327,14 +327,16 @@ open class DriverDSL( executorService.shutdown() } - private fun queryNodeInfo(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): NodeInfo? { - while (true) try { - val client = CordaRPCClient(nodeAddress, sslConfig) - client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER) - val rpcOps = client.proxy(timeout = Duration.of(15, ChronoUnit.SECONDS)) - return rpcOps.nodeIdentity() - } catch(e: Exception) { - log.error("Retrying query node info at $nodeAddress") + private fun establishRpc(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): ListenableFuture { + val client = CordaRPCClient(nodeAddress, sslConfig) + return poll(executorService, "for RPC connection") { + try { + client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER) + return@poll client.proxy() + } catch(e: Exception) { + log.error("Retrying query node info at $nodeAddress") + null + } } } @@ -374,10 +376,18 @@ open class DriverDSL( ) ) - val startNode = startNode(executorService, configuration, quasarJarPath, debugPort) - registerProcess(startNode) - return startNode.map { - NodeHandle(queryNodeInfo(messagingAddress, configuration)!!, configuration, it) + val processFuture = startNode(executorService, configuration, quasarJarPath, debugPort) + registerProcess(processFuture) + return processFuture.flatMap { process -> + establishRpc(messagingAddress, configuration).flatMap { rpc -> + poll(executorService, "$name to register with the network map") { + if (rpc.isRegisteredWithNetworkMap()) { + NodeHandle(rpc.nodeIdentity(), rpc, configuration, process) + } else { + null + } + } + } } } diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index fedca799b1..1d74e409cd 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -111,6 +111,8 @@ class CordaRPCOpsImpl( } } + override fun isRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered.isDone + override fun partyFromKey(key: CompositeKey) = services.identityService.partyFromKey(key) override fun partyFromName(name: String) = services.identityService.partyFromName(name)