Moved the webserver code into the main node module. Driver can now start webserver for nodes.

This commit is contained in:
Clinton Alexander
2016-12-23 15:49:15 +00:00
committed by Clinton Alexander
parent ecfb762143
commit 5f4d4c1da3
17 changed files with 145 additions and 102 deletions

View File

@ -1,5 +1,6 @@
package net.corda.node.driver
import com.google.common.net.HostAndPort
import net.corda.core.getOrThrow
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
@ -25,6 +26,14 @@ class DriverTests {
// Check that the port is bound
addressMustNotBeBound(executorService, hostAndPort)
}
fun webserverMustBeUp(webserverAddr: HostAndPort) {
addressMustBeBound(executorService, webserverAddr)
}
fun webserverMustBeDown(webserverAddr: HostAndPort) {
addressMustNotBeBound(executorService, webserverAddr)
}
}
@Test
@ -60,4 +69,15 @@ class DriverTests {
}
nodeMustBeDown(nodeInfo.nodeInfo)
}
@Test
fun `starting a node and independent web server works`() {
val addr = driver {
val node = startNode("test").getOrThrow()
val webserverAddr = startWebserver(node).getOrThrow()
webserverMustBeUp(webserverAddr)
webserverAddr
}
webserverMustBeDown(addr)
}
}

View File

@ -2,8 +2,6 @@
package net.corda.node.driver
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
@ -12,7 +10,6 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
@ -22,21 +19,21 @@ import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.NodeSSLConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.JsonSupport
import net.corda.node.utilities.ServiceIdentityGenerator
import org.slf4j.Logger
import java.io.File
import java.net.*
import java.nio.file.Path
import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.Future
@ -88,6 +85,13 @@ interface DriverDSLExposedInterface {
type: ServiceType = RaftValidatingNotaryService.type,
rpcUsers: List<User> = emptyList()): Future<Pair<Party, List<NodeHandle>>>
/**
* Starts a web server for a node
*
* @param handle The handle for the node that this webserver connects to via RPC.
*/
fun startWebserver(handle: NodeHandle): Future<HostAndPort>
fun waitForAllNodesToFinish()
}
@ -322,16 +326,20 @@ open class DriverDSL(
executorService.shutdown()
}
private fun queryNodeInfo(webAddress: HostAndPort, sslConfig: NodeSSLConfiguration): NodeInfo? {
try {
val client = CordaRPCClient(webAddress, sslConfig)
private fun queryNodeInfo(nodeAddress: HostAndPort, sslConfig: NodeSSLConfiguration): NodeInfo? {
var retries = 0
while (retries < 5) try {
val client = CordaRPCClient(nodeAddress, sslConfig)
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
val rpcOps = client.proxy()
return rpcOps.nodeIdentity()
val rpcOps = client.proxy(timeout = Duration.of(15, ChronoUnit.SECONDS))
return rpcOps.nodeIdentity()
} catch(e: Exception) {
log.error("Could not query node info at $webAddress due to an exception.", e)
return null
log.error("Retrying query node info at $nodeAddress")
retries++
}
log.error("Could not query node info after $retries retries")
return null
}
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>,
@ -408,6 +416,15 @@ open class DriverDSL(
}
}
override fun startWebserver(handle: NodeHandle): Future<HostAndPort> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
return future {
registerProcess(DriverDSL.startWebserver(executorService, handle.configuration, debugPort))
handle.configuration.webAddress
}
}
override fun start() {
startNetworkMapService()
}
@ -480,11 +497,37 @@ open class DriverDSL(
builder.inheritIO()
builder.directory(nodeConf.baseDirectory.toFile())
val process = builder.start()
return Futures.allAsList(
addressMustBeBound(executorService, nodeConf.artemisAddress),
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
// the handlers for the advertised services are not yet registered. Needs rethinking.
).map { process }
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
// the handlers for the advertised services are not yet registered. Needs rethinking.
return addressMustBeBound(executorService, nodeConf.artemisAddress).map { process }
}
private fun startWebserver(
executorService: ScheduledExecutorService,
nodeConf: FullNodeConfiguration,
debugPort: Int?): ListenableFuture<Process> {
val className = "net.corda.node.webserver.MainKt" // cannot directly get class for this, so just use string
val separator = System.getProperty("file.separator")
val classpath = System.getProperty("java.class.path")
val path = System.getProperty("java.home") + separator + "bin" + separator + "java"
val debugPortArg = if (debugPort != null)
listOf("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$debugPort")
else
emptyList()
val javaArgs = listOf(path) +
listOf("-Dname=node-${nodeConf.artemisAddress}-webserver") + debugPortArg +
listOf(
"-cp", classpath, className,
"--base-directory", nodeConf.baseDirectory.toString(),
"--web-address", nodeConf.webAddress.toString())
val builder = ProcessBuilder(javaArgs)
builder.redirectError(Paths.get("error.$className.log").toFile())
builder.inheritIO()
builder.directory(nodeConf.baseDirectory.toFile())
val process = builder.start()
return addressMustBeBound(executorService, nodeConf.webAddress).map { process }
}
}
}

View File

@ -6,13 +6,12 @@ import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.div
import net.corda.core.flatMap
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.getOrThrow
import net.corda.core.messaging.RPCOps
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.success
import net.corda.core.utilities.loggerFor
import net.corda.node.printBasicNodeInfo
import net.corda.node.serialization.NodeClock
@ -20,39 +19,19 @@ import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.servlets.AttachmentDownloadServlet
import net.corda.node.servlets.Config
import net.corda.node.servlets.DataUploadServlet
import net.corda.node.servlets.ResponseFilter
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.databaseTransaction
import org.eclipse.jetty.server.*
import org.eclipse.jetty.server.handler.HandlerCollection
import org.eclipse.jetty.servlet.DefaultServlet
import org.eclipse.jetty.servlet.FilterHolder
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
import org.eclipse.jetty.util.ssl.SslContextFactory
import org.eclipse.jetty.webapp.WebAppContext
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.server.ServerProperties
import org.glassfish.jersey.servlet.ServletContainer
import org.jetbrains.exposed.sql.Database
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.lang.reflect.InvocationTargetException
import java.net.InetAddress
import java.nio.channels.FileLock
import java.time.Clock
import java.util.*
import javax.management.ObjectName
import javax.servlet.*
import kotlin.concurrent.thread
@ -293,6 +272,7 @@ class Node(override val configuration: FullNodeConfiguration,
chain.doFilter(request, response)
}
}
override fun init(filterConfig: FilterConfig?) {}
override fun destroy() {}
}

View File

@ -0,0 +1,9 @@
package net.corda.node.utilities
import com.google.common.net.HostAndPort
import com.typesafe.config.Config
import java.nio.file.Path
import java.nio.file.Paths
fun Config.getHostAndPort(name: String): HostAndPort = HostAndPort.fromString(getString(name))
fun Config.getPath(name: String): Path = Paths.get(getString(name))

View File

@ -0,0 +1,44 @@
package net.corda.node.webserver
import joptsimple.OptionParser
import net.corda.node.driver.driver
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import java.io.File
import java.nio.file.Paths
import kotlin.system.exitProcess
fun main(args: Array<String>) {
// TODO: Print basic webserver info
val parser = OptionParser()
val baseDirectoryArg = parser.accepts("base-directory", "The directory to put all files under").withRequiredArg()
val webAddressArg = parser.accepts("web-address", "The web address for this server to bind").withOptionalArg()
val logToConsoleArg = parser.accepts("log-to-console", "If set, prints logging to the console as well as to a file.")
val helpArg = parser.accepts("help").forHelp()
val cmdlineOptions = try {
parser.parse(*args)
} catch (ex: Exception) {
println("Unknown command line arguments: ${ex.message}")
exitProcess(1)
}
// Maybe render command line help.
if (cmdlineOptions.has(helpArg)) {
parser.printHelpOn(System.out)
exitProcess(0)
}
// Set up logging.
if (cmdlineOptions.has(logToConsoleArg)) {
// This property is referenced from the XML config file.
System.setProperty("consoleLogLevel", "info")
}
val baseDirectoryPath = Paths.get(cmdlineOptions.valueOf(baseDirectoryArg))
val config = ConfigHelper.loadConfig(baseDirectoryPath)
println("Starting server")
val nodeConf = FullNodeConfiguration(baseDirectoryPath, config)
val server = WebServer(nodeConf).start()
println("Exiting")
}

View File

@ -0,0 +1,14 @@
package net.corda.node.webserver
import net.corda.node.driver.driver
import net.corda.node.services.config.FullNodeConfiguration
fun main(args: Array<String>) {
// TODO: Print basic webserver info
System.setProperty("consoleLogLevel", "info")
driver {
val node = startNode().get()
val server = WebServer(node.configuration).start()
waitForAllNodesToFinish()
}
}

View File

@ -0,0 +1,153 @@
package net.corda.node.webserver
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.utilities.loggerFor
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.servlets.AttachmentDownloadServlet
import net.corda.node.servlets.DataUploadServlet
import net.corda.node.servlets.ResponseFilter
import net.corda.node.webserver.internal.APIServerImpl
import org.eclipse.jetty.server.*
import org.eclipse.jetty.server.handler.HandlerCollection
import org.eclipse.jetty.servlet.DefaultServlet
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
import org.eclipse.jetty.util.ssl.SslContextFactory
import org.eclipse.jetty.webapp.WebAppContext
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.server.ServerProperties
import org.glassfish.jersey.servlet.ServletContainer
import java.lang.reflect.InvocationTargetException
import java.net.InetAddress
import java.util.*
class WebServer(val config: FullNodeConfiguration) {
private companion object {
val log = loggerFor<WebServer>()
}
val address = config.webAddress
fun start() {
initWebServer(connectLocalRpcAsNodeUser())
}
private fun initWebServer(localRpc: CordaRPCOps): Server {
// Note that the web server handlers will all run concurrently, and not on the node thread.
val handlerCollection = HandlerCollection()
// Export JMX monitoring statistics and data over REST/JSON.
if (config.exportJMXto.split(',').contains("http")) {
val classpath = System.getProperty("java.class.path").split(System.getProperty("path.separator"))
val warpath = classpath.firstOrNull { it.contains("jolokia-agent-war-2") && it.endsWith(".war") }
if (warpath != null) {
handlerCollection.addHandler(WebAppContext().apply {
// Find the jolokia WAR file on the classpath.
contextPath = "/monitoring/json"
setInitParameter("mimeType", "application/json")
war = warpath
})
} else {
log.warn("Unable to locate Jolokia WAR on classpath")
}
}
// API, data upload and download to services (attachments, rates oracles etc)
handlerCollection.addHandler(buildServletContextHandler(localRpc))
val server = Server()
val connector = if (config.useHTTPS) {
val httpsConfiguration = HttpConfiguration()
httpsConfiguration.outputBufferSize = 32768
httpsConfiguration.addCustomizer(SecureRequestCustomizer())
val sslContextFactory = SslContextFactory()
sslContextFactory.keyStorePath = config.keyStorePath.toString()
sslContextFactory.setKeyStorePassword(config.keyStorePassword)
sslContextFactory.setKeyManagerPassword(config.keyStorePassword)
sslContextFactory.setTrustStorePath(config.trustStorePath.toString())
sslContextFactory.setTrustStorePassword(config.trustStorePassword)
sslContextFactory.setExcludeProtocols("SSL.*", "TLSv1", "TLSv1.1")
sslContextFactory.setIncludeProtocols("TLSv1.2")
sslContextFactory.setExcludeCipherSuites(".*NULL.*", ".*RC4.*", ".*MD5.*", ".*DES.*", ".*DSS.*")
sslContextFactory.setIncludeCipherSuites(".*AES.*GCM.*")
val sslConnector = ServerConnector(server, SslConnectionFactory(sslContextFactory, "http/1.1"), HttpConnectionFactory(httpsConfiguration))
sslConnector.port = address.port
sslConnector
} else {
val httpConfiguration = HttpConfiguration()
httpConfiguration.outputBufferSize = 32768
val httpConnector = ServerConnector(server, HttpConnectionFactory(httpConfiguration))
println("Starting webserver on address $address")
httpConnector.port = address.port
httpConnector
}
server.connectors = arrayOf<Connector>(connector)
server.handler = handlerCollection
//runOnStop += Runnable { server.stop() }
server.start()
log.info("Embedded web server is listening on", "http://${InetAddress.getLocalHost().hostAddress}:${connector.port}/")
return server
}
private fun buildServletContextHandler(localRpc: CordaRPCOps): ServletContextHandler {
return ServletContextHandler().apply {
contextPath = "/"
addServlet(DataUploadServlet::class.java, "/upload/*")
addServlet(AttachmentDownloadServlet::class.java, "/attachments/*")
val resourceConfig = ResourceConfig()
resourceConfig.register(ResponseFilter())
resourceConfig.register(APIServerImpl(localRpc))
val webAPIsOnClasspath = pluginRegistries.flatMap { x -> x.webApis }
println("NUM PLUGINS: ${pluginRegistries.size}")
println("NUM WEBAPIS: ${webAPIsOnClasspath.size}")
for (webapi in webAPIsOnClasspath) {
log.info("Add plugin web API from attachment $webapi")
val customAPI = try {
webapi.apply(localRpc)
} catch (ex: InvocationTargetException) {
log.error("Constructor $webapi threw an error: ", ex.targetException)
continue
}
resourceConfig.register(customAPI)
}
val staticDirMaps = pluginRegistries.map { x -> x.staticServeDirs }
val staticDirs = staticDirMaps.flatMap { it.keys }.zip(staticDirMaps.flatMap { it.values })
staticDirs.forEach {
val staticDir = ServletHolder(DefaultServlet::class.java)
staticDir.setInitParameter("resourceBase", it.second)
staticDir.setInitParameter("dirAllowed", "true")
staticDir.setInitParameter("pathInfoOnly", "true")
addServlet(staticDir, "/web/${it.first}/*")
}
// Give the app a slightly better name in JMX rather than a randomly generated one and enable JMX
resourceConfig.addProperties(mapOf(ServerProperties.APPLICATION_NAME to "node.api",
ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED to "true"))
val container = ServletContainer(resourceConfig)
val jerseyServlet = ServletHolder(container)
addServlet(jerseyServlet, "/api/*")
jerseyServlet.initOrder = 0 // Initialise at server start
}
}
private fun connectLocalRpcAsNodeUser(): CordaRPCOps {
log.info("Connecting to node at ${config.artemisAddress} as node user")
val client = CordaRPCClient(config.artemisAddress, config)
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
return client.proxy()
}
/** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */
val pluginRegistries: List<CordaPluginRegistry> by lazy {
ServiceLoader.load(CordaPluginRegistry::class.java).toList()
}
}

View File

@ -0,0 +1,43 @@
package net.corda.node.webserver.api
import net.corda.core.node.NodeInfo
import java.time.LocalDateTime
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response
/**
* Top level interface to external interaction with the distributed ledger.
*
* Wherever a list is returned by a fetchXXX method that corresponds with an input list, that output list will have optional elements
* where a null indicates "missing" and the elements returned will be in the order corresponding with the input list.
*
*/
@Path("")
interface APIServer {
/**
* Report current UTC time as understood by the platform.
*/
@GET
@Path("servertime")
@Produces(MediaType.APPLICATION_JSON)
fun serverTime(): LocalDateTime
/**
* Report whether this node is started up or not.
*/
@GET
@Path("status")
@Produces(MediaType.TEXT_PLAIN)
fun status(): Response
/**
* Report this node's configuration and identities.
*/
@GET
@Path("info")
@Produces(MediaType.APPLICATION_JSON)
fun info(): NodeInfo
}

View File

@ -0,0 +1,21 @@
package net.corda.node.webserver.api
/**
* Extremely rudimentary query language which should most likely be replaced with a product.
*/
interface StatesQuery {
companion object {
fun select(criteria: Criteria): Selection {
return Selection(criteria)
}
}
// TODO make constructors private
data class Selection(val criteria: Criteria) : StatesQuery
interface Criteria {
object AllDeals : Criteria
data class Deal(val ref: String) : Criteria
}
}

View File

@ -0,0 +1,24 @@
package net.corda.node.webserver.internal
import net.corda.core.messaging.CordaRPCOps
import net.corda.node.webserver.api.*
import java.time.LocalDateTime
import java.time.ZoneId
import javax.ws.rs.core.Response
class APIServerImpl(val rpcOps: CordaRPCOps) : APIServer {
override fun serverTime(): LocalDateTime {
return LocalDateTime.ofInstant(rpcOps.currentNodeTime(), ZoneId.of("UTC"))
}
override fun status(): Response {
return if (rpcOps.ready()) {
Response.ok("started").build()
} else {
Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("not started").build()
}
}
override fun info() = rpcOps.nodeIdentity()
}