node-driver: Add startClient, startLocalServer

This commit is contained in:
Andras Slemmer
2016-08-18 11:25:18 +01:00
parent 5f33bedc13
commit f4577b743e

View File

@ -4,13 +4,14 @@ import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.messaging.ArtemisMessagingClient
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.utilities.AffinityExecutor import com.r3corda.node.utilities.AffinityExecutor
@ -56,11 +57,33 @@ interface DriverDSLExposedInterface {
* @return The [NodeInfo] of the started up node retrieved from the network map service. * @return The [NodeInfo] of the started up node retrieved from the network map service.
*/ */
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceType> = setOf()): Future<NodeInfo> fun startNode(providedName: String? = null, advertisedServices: Set<ServiceType> = setOf()): Future<NodeInfo>
/**
* Starts an [ArtemisMessagingClient]
*
* @param providedName name of the client, which will be used for creating its directory
* @param serverAddress the artemis server to connect to, for example a [Node]
* @param clientAddress the address of the client (this is not bound by the client!), defaults to [serverAddress] if null
*/
fun startClient(providedName: String, serverAddress: HostAndPort, clientAddress: HostAndPort?): Future<ArtemisMessagingClient>
/**
* Starts a local [ArtemisMessagingServer] of which there may only be one.
*/
fun startLocalServer(): Future<ArtemisMessagingServer>
fun waitForAllNodesToFinish() fun waitForAllNodesToFinish()
val messagingService: MessagingService
val networkMapCache: NetworkMapCache val networkMapCache: NetworkMapCache
} }
fun DriverDSLExposedInterface.startClient(localServer: ArtemisMessagingServer) =
startClient("driver-local-server-client", localServer.myHostPort, localServer.myHostPort)
fun DriverDSLExposedInterface.startClient(remoteNodeInfo: NodeInfo, providedName: String? = null) =
startClient(
providedName = providedName ?: "${remoteNodeInfo.identity.name}-client",
serverAddress = (remoteNodeInfo.address as ArtemisMessagingComponent.Address).hostAndPort,
clientAddress = null
)
interface DriverDSLInternalInterface : DriverDSLExposedInterface { interface DriverDSLInternalInterface : DriverDSLExposedInterface {
fun start() fun start()
fun shutdown() fun shutdown()
@ -91,7 +114,6 @@ sealed class PortAllocation {
* [NodeInfo] that may be waited on, which guarantees that the new node registered with the network map service. * [NodeInfo] that may be waited on, which guarantees that the new node registered with the network map service.
* *
* The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache] * The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache]
* The driver is an artemis node itself, the messaging service may be accessed by [DriverDSL.messagingService]
* *
* @param baseDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node * @param baseDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node
* directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>" * directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>"
@ -215,25 +237,6 @@ class DriverDSL(
Paths.get(quasarFileUrl.toURI()).toString() Paths.get(quasarFileUrl.toURI()).toString()
} }
val driverNodeConfiguration = NodeConfigurationFromConfig(
NodeConfiguration.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, "driver-artemis"),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to "driver-artemis"
)
)
)
override val messagingService = ArtemisMessagingClient(
Paths.get(baseDirectory, "driver-artemis"),
driverNodeConfiguration,
serverHostPort = networkMapAddress,
myHostPort = portAllocation.nextHostAndPort(),
executor = AffinityExecutor.ServiceAffinityExecutor("Client thread", 1)
)
var messagingServiceStarted = false
fun registerProcess(process: Process) = state.locked { registeredProcesses.push(process) } fun registerProcess(process: Process) = state.locked { registeredProcesses.push(process) }
override fun waitForAllNodesToFinish() { override fun waitForAllNodesToFinish() {
@ -246,6 +249,10 @@ class DriverDSL(
override fun shutdown() { override fun shutdown() {
state.locked { state.locked {
clients.forEach {
it.stop()
}
localServer?.stop()
registeredProcesses.forEach(Process::destroy) registeredProcesses.forEach(Process::destroy)
} }
/** Wait 5 seconds, then [Process.destroyForcibly] */ /** Wait 5 seconds, then [Process.destroyForcibly] */
@ -309,12 +316,70 @@ class DriverDSL(
}) })
} }
override fun startClient(
providedName: String,
serverAddress: HostAndPort,
clientAddress: HostAndPort?
): Future<ArtemisMessagingClient> {
val nodeConfiguration = NodeConfigurationFromConfig(
NodeConfiguration.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, providedName),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to providedName
)
)
)
val client = ArtemisMessagingClient(
Paths.get(baseDirectory, providedName),
nodeConfiguration,
serverHostPort = serverAddress,
myHostPort = clientAddress ?: serverAddress,
executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1)
)
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingClient> {
client.configureWithDevSSLCertificate()
client.start()
thread { client.run() }
state.locked {
clients.add(client)
}
client
})
}
override fun startLocalServer(): Future<ArtemisMessagingServer> {
val name = "driver-local-server"
val config = NodeConfigurationFromConfig(
NodeConfiguration.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, name),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to name
)
)
)
val server = ArtemisMessagingServer(
Paths.get(baseDirectory, name),
config,
portAllocation.nextHostAndPort()
)
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingServer> {
server.configureWithDevSSLCertificate()
server.start()
state.locked {
localServer = server
}
server
})
}
override fun start() { override fun start() {
startNetworkMapService() startNetworkMapService()
messagingService.configureWithDevSSLCertificate() val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress, portAllocation.nextHostAndPort()).get()
messagingService.start()
thread { messagingService.run() }
messagingServiceStarted = true
// We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from
// the network map service itself // the network map service itself
val fakeNodeInfo = NodeInfo( val fakeNodeInfo = NodeInfo(
@ -325,7 +390,7 @@ class DriverDSL(
), ),
advertisedServices = setOf(NetworkMapService.Type) advertisedServices = setOf(NetworkMapService.Type)
) )
networkMapCache.addMapService(messagingService, fakeNodeInfo, true) networkMapCache.addMapService(networkMapClient, fakeNodeInfo, true)
networkMapNodeInfo = poll("network map cache for $networkMapName") { networkMapNodeInfo = poll("network map cache for $networkMapName") {
networkMapCache.partyNodes.forEach { networkMapCache.partyNodes.forEach {
if (it.identity.name == networkMapName) { if (it.identity.name == networkMapName) {