ENT-1069 Allow for nodes to be started in registration mode (#2065)

* ENT-1069 Allow for nodes to be started in registration mode
This commit is contained in:
Alberto Arri 2017-11-21 09:59:56 +00:00 committed by GitHub
parent 2dc4251cbe
commit a13dbcaa5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 133 additions and 29 deletions

View File

@ -1,5 +1,8 @@
package net.corda.testing.driver package net.corda.testing.driver
import com.sun.net.httpserver.HttpExchange
import com.sun.net.httpserver.HttpHandler
import com.sun.net.httpserver.HttpServer
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.div import net.corda.core.internal.div
import net.corda.core.internal.list import net.corda.core.internal.list
@ -13,6 +16,8 @@ import net.corda.testing.ProjectStructure.projectRootDir
import net.corda.testing.node.NotarySpec import net.corda.testing.node.NotarySpec
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.Test import org.junit.Test
import java.net.InetSocketAddress
import java.net.URL
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
@ -51,6 +56,42 @@ class DriverTests {
nodeMustBeDown(nodeHandle) nodeMustBeDown(nodeHandle)
} }
@Test
fun `node registration`() {
// Very simple Http handler which counts the requests it has received and always returns the same payload.
val handler = object : HttpHandler {
private val _requests = mutableListOf<String>()
val requests: List<String>
get() = _requests.toList()
override fun handle(exchange: HttpExchange) {
val response = "reply"
_requests.add(exchange.requestURI.toString())
exchange.responseHeaders.set("Content-Type", "text/html; charset=" + Charsets.UTF_8)
exchange.sendResponseHeaders(200, response.length.toLong())
exchange.responseBody.use { it.write(response.toByteArray()) }
}
}
val inetSocketAddress = InetSocketAddress(0)
val server = HttpServer.create(inetSocketAddress, 0)
val port = server.address.port
server.createContext("/", handler)
server.executor = null // creates a default executor
server.start()
driver(compatibilityZoneURL = URL("http://localhost:$port")) {
// Wait for the notary to have started.
notaryHandles.first().nodeHandles.get()
}
// We're getting:
// a request to sign the certificate then
// at least one poll request to see if the request has been approved.
// all the network map registration and download.
assertThat(handler.requests).startsWith("/certificate", "/certificate/reply")
}
@Test @Test
fun `debug mode enables debug logging level`() { fun `debug mode enables debug logging level`() {
// Make sure we're using the log4j2 config which writes to the log file // Make sure we're using the log4j2 config which writes to the log file

View File

@ -30,6 +30,8 @@ import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.Permissions.Companion.invokeRpc import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.config.* import net.corda.node.services.config.*
import net.corda.node.utilities.ServiceIdentityGenerator import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.node.utilities.registration.HTTPNetworkRegistrationService
import net.corda.node.utilities.registration.NetworkRegistrationHelper
import net.corda.nodeapi.NodeInfoFilesCopier import net.corda.nodeapi.NodeInfoFilesCopier
import net.corda.nodeapi.User import net.corda.nodeapi.User
import net.corda.nodeapi.config.toConfig import net.corda.nodeapi.config.toConfig
@ -319,7 +321,9 @@ data class NodeParameters(
* @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or * @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or
* not. Note that this may be overridden in [DriverDSLExposedInterface.startNode]. * not. Note that this may be overridden in [DriverDSLExposedInterface.startNode].
* @param notarySpecs The notaries advertised in the [NetworkParameters] for this network. These nodes will be started * @param notarySpecs The notaries advertised in the [NetworkParameters] for this network. These nodes will be started
* automatically and will be available from [DriverDSLExposedInterface.notaryHandles]. Defaults to a simple validating notary. * automatically and will be available from [DriverDSLExposedInterface.notaryHandles]. Defaults to a simple validating notary.
* @param compatibilityZoneURL if not null each node is started once in registration mode (which makes the node register and quit),
* and then re-starts the node with the given parameters.
* @param dsl The dsl itself. * @param dsl The dsl itself.
* @return The value returned in the [dsl] closure. * @return The value returned in the [dsl] closure.
*/ */
@ -336,6 +340,7 @@ fun <A> driver(
waitForAllNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish, waitForAllNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish,
notarySpecs: List<NotarySpec> = defaultParameters.notarySpecs, notarySpecs: List<NotarySpec> = defaultParameters.notarySpecs,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan, extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
compatibilityZoneURL: URL? = defaultParameters.compatibilityZoneURL,
dsl: DriverDSLExposedInterface.() -> A dsl: DriverDSLExposedInterface.() -> A
): A { ): A {
return genericDriver( return genericDriver(
@ -349,7 +354,8 @@ fun <A> driver(
startNodesInProcess = startNodesInProcess, startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForAllNodesToFinish, waitForNodesToFinish = waitForAllNodesToFinish,
notarySpecs = notarySpecs, notarySpecs = notarySpecs,
extraCordappPackagesToScan = extraCordappPackagesToScan extraCordappPackagesToScan = extraCordappPackagesToScan,
compatibilityZoneURL = compatibilityZoneURL
), ),
coerce = { it }, coerce = { it },
dsl = dsl, dsl = dsl,
@ -384,7 +390,8 @@ data class DriverParameters(
val startNodesInProcess: Boolean = false, val startNodesInProcess: Boolean = false,
val waitForNodesToFinish: Boolean = false, val waitForNodesToFinish: Boolean = false,
val notarySpecs: List<NotarySpec> = listOf(NotarySpec(DUMMY_NOTARY.name)), val notarySpecs: List<NotarySpec> = listOf(NotarySpec(DUMMY_NOTARY.name)),
val extraCordappPackagesToScan: List<String> = emptyList() val extraCordappPackagesToScan: List<String> = emptyList(),
val compatibilityZoneURL: URL? = null
) { ) {
fun setIsDebug(isDebug: Boolean) = copy(isDebug = isDebug) fun setIsDebug(isDebug: Boolean) = copy(isDebug = isDebug)
fun setDriverDirectory(driverDirectory: Path) = copy(driverDirectory = driverDirectory) fun setDriverDirectory(driverDirectory: Path) = copy(driverDirectory = driverDirectory)
@ -449,6 +456,7 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
startNodesInProcess: Boolean = defaultParameters.startNodesInProcess, startNodesInProcess: Boolean = defaultParameters.startNodesInProcess,
notarySpecs: List<NotarySpec>, notarySpecs: List<NotarySpec>,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan, extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
compatibilityZoneURL: URL? = defaultParameters.compatibilityZoneURL,
driverDslWrapper: (DriverDSL) -> D, driverDslWrapper: (DriverDSL) -> D,
coerce: (D) -> DI, dsl: DI.() -> A coerce: (D) -> DI, dsl: DI.() -> A
): A { ): A {
@ -464,7 +472,8 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
startNodesInProcess = startNodesInProcess, startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForNodesToFinish, waitForNodesToFinish = waitForNodesToFinish,
extraCordappPackagesToScan = extraCordappPackagesToScan, extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs notarySpecs = notarySpecs,
compatibilityZoneURL = compatibilityZoneURL
) )
) )
val shutdownHook = addShutdownHook(driverDsl::shutdown) val shutdownHook = addShutdownHook(driverDsl::shutdown)
@ -570,7 +579,8 @@ class DriverDSL(
val startNodesInProcess: Boolean, val startNodesInProcess: Boolean,
val waitForNodesToFinish: Boolean, val waitForNodesToFinish: Boolean,
extraCordappPackagesToScan: List<String>, extraCordappPackagesToScan: List<String>,
val notarySpecs: List<NotarySpec> val notarySpecs: List<NotarySpec>,
val compatibilityZoneURL: URL?
) : DriverDSLInternalInterface { ) : DriverDSLInternalInterface {
private var _executorService: ScheduledExecutorService? = null private var _executorService: ScheduledExecutorService? = null
val executorService get() = _executorService!! val executorService get() = _executorService!!
@ -644,25 +654,50 @@ class DriverDSL(
maximumHeapSize: String maximumHeapSize: String
): CordaFuture<NodeHandle> { ): CordaFuture<NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort() val p2pAddress = portAllocation.nextHostAndPort()
val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name // TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB") val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB")
val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) } val registrationFuture = compatibilityZoneURL?.let { registerNode(name, it) } ?: doneFuture(Unit)
return registrationFuture.flatMap {
val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) }
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = configOf(
"myLegalName" to name.toString(),
"p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"useTestClock" to useTestClock,
"rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
"verifierType" to verifierType.name
) + customOverrides
)
startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
}
}
private fun registerNode(providedName: CordaX500Name, compatibilityZoneURL: URL): CordaFuture<Unit> {
val config = ConfigHelper.loadConfig( val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name), baseDirectory = baseDirectory(providedName),
allowMissingConfig = true, allowMissingConfig = true,
configOverrides = configOf( configOverrides = configOf(
"myLegalName" to name.toString(), "p2pAddress" to "localhost:1222", // required argument, not really used
"p2pAddress" to p2pAddress.toString(), "compatibilityZoneURL" to compatibilityZoneURL.toString(),
"rpcAddress" to rpcAddress.toString(), "myLegalName" to providedName.toString())
"webAddress" to webAddress.toString(),
"useTestClock" to useTestClock,
"rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
"verifierType" to verifierType.name
) + customOverrides
) )
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize) if (startNodesInProcess) {
// This is a bit cheating, we're not starting a full node, we're just calling the code nodes call
// when registering.
val configuration = config.parseAsNodeConfiguration()
NetworkRegistrationHelper(configuration, HTTPNetworkRegistrationService(compatibilityZoneURL))
.buildKeystore()
return doneFuture(Unit)
} else {
return startNodeForRegistration(config)
}
} }
override fun startNodes(nodes: List<CordformNode>, startInSameProcess: Boolean?, maximumHeapSize: String): List<CordaFuture<NodeHandle>> { override fun startNodes(nodes: List<CordformNode>, startInSameProcess: Boolean?, maximumHeapSize: String): List<CordaFuture<NodeHandle>> {
@ -679,7 +714,8 @@ class DriverDSL(
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers "rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers
) )
) )
startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize) val registrationFuture = compatibilityZoneURL?.let { registerNode(name, it) } ?: doneFuture(Unit)
registrationFuture.flatMap { startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize) }
} }
} }
@ -856,6 +892,20 @@ class DriverDSL(
return future return future
} }
private fun startNodeForRegistration(config: Config): CordaFuture<Unit> {
val maximumHeapSize = "200m"
val configuration = config.parseAsNodeConfiguration()
configuration.baseDirectory.createDirectories()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort,
systemProperties, cordappPackages, maximumHeapSize, initialRegistration = true)
return poll(executorService, "process exit") {
if (process.isAlive) null else Unit
}
}
private fun startNodeInternal(config: Config, private fun startNodeInternal(config: Config,
webAddress: NetworkHostAndPort, webAddress: NetworkHostAndPort,
startInProcess: Boolean?, startInProcess: Boolean?,
@ -887,7 +937,8 @@ class DriverDSL(
} }
} else { } else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort, systemProperties, cordappPackages, maximumHeapSize) val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort,
systemProperties, cordappPackages, maximumHeapSize, initialRegistration = false)
if (waitForNodesToFinish) { if (waitForNodesToFinish) {
state.locked { state.locked {
processes += process processes += process
@ -967,7 +1018,8 @@ class DriverDSL(
debugPort: Int?, debugPort: Int?,
overriddenSystemProperties: Map<String, String>, overriddenSystemProperties: Map<String, String>,
cordappPackages: List<String>, cordappPackages: List<String>,
maximumHeapSize: String maximumHeapSize: String,
initialRegistration: Boolean
): Process { ): Process {
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled")) log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled"))
// Write node.conf // Write node.conf
@ -991,13 +1043,18 @@ class DriverDSL(
"-javaagent:$quasarJarPath=$excludePattern" "-javaagent:$quasarJarPath=$excludePattern"
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG" val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
return ProcessUtilities.startCordaProcess( val arguments = mutableListOf<String>(
"--base-directory=${nodeConf.baseDirectory}",
"--logging-level=$loggingLevel",
"--no-local-shell").also {
if (initialRegistration) {
it += "--initial-registration"
}
}.toList()
return ProcessUtilities.startCordaProcess(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = listOf( arguments = arguments,
"--base-directory=${nodeConf.baseDirectory}",
"--logging-level=$loggingLevel",
"--no-local-shell"
),
jdwpPort = debugPort, jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments, extraJvmArguments = extraJvmArguments,
errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log", errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3
import java.lang.reflect.Method import java.lang.reflect.Method
import java.net.URL
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
import java.util.* import java.util.*
@ -236,6 +237,7 @@ fun <A> rpcDriver(
extraCordappPackagesToScan: List<String> = emptyList(), extraCordappPackagesToScan: List<String> = emptyList(),
notarySpecs: List<NotarySpec> = emptyList(), notarySpecs: List<NotarySpec> = emptyList(),
externalTrace: Trace? = null, externalTrace: Trace? = null,
compatibilityZoneURL: URL? = null,
dsl: RPCDriverExposedDSLInterface.() -> A dsl: RPCDriverExposedDSLInterface.() -> A
) = genericDriver( ) = genericDriver(
driverDsl = RPCDriverDSL( driverDsl = RPCDriverDSL(
@ -249,7 +251,8 @@ fun <A> rpcDriver(
startNodesInProcess = startNodesInProcess, startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForNodesToFinish, waitForNodesToFinish = waitForNodesToFinish,
extraCordappPackagesToScan = extraCordappPackagesToScan, extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs notarySpecs = notarySpecs,
compatibilityZoneURL = compatibilityZoneURL
), externalTrace ), externalTrace
), ),
coerce = { it }, coerce = { it },

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.security.CheckType
import org.apache.activemq.artemis.core.security.Role import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager
import java.net.URL
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
@ -85,6 +86,7 @@ fun <A> verifierDriver(
waitForNodesToFinish: Boolean = false, waitForNodesToFinish: Boolean = false,
extraCordappPackagesToScan: List<String> = emptyList(), extraCordappPackagesToScan: List<String> = emptyList(),
notarySpecs: List<NotarySpec> = emptyList(), notarySpecs: List<NotarySpec> = emptyList(),
compatibilityZoneURL: URL? = null,
dsl: VerifierExposedDSLInterface.() -> A dsl: VerifierExposedDSLInterface.() -> A
) = genericDriver( ) = genericDriver(
driverDsl = VerifierDriverDSL( driverDsl = VerifierDriverDSL(
@ -98,7 +100,8 @@ fun <A> verifierDriver(
startNodesInProcess = startNodesInProcess, startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForNodesToFinish, waitForNodesToFinish = waitForNodesToFinish,
extraCordappPackagesToScan = extraCordappPackagesToScan, extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs notarySpecs = notarySpecs,
compatibilityZoneURL = compatibilityZoneURL
) )
), ),
coerce = { it }, coerce = { it },