t # This is a combination of 5 commits.

Driver now queries webserver to ensure it has started.
This commit is contained in:
Clinton Alexander 2017-01-12 14:35:40 +00:00
parent 5f4d4c1da3
commit d4b6e32682
13 changed files with 109 additions and 41 deletions

View File

@ -151,6 +151,9 @@ dependencies {
compile 'io.atomix.copycat:copycat-server:1.1.4'
compile 'io.atomix.catalyst:catalyst-netty:1.1.1'
// OkHTTP: Simple HTTP library.
compile "com.squareup.okhttp3:okhttp:$okhttp_version"
// Integration test helpers
integrationTestCompile "junit:junit:$junit_version"

View File

@ -2,6 +2,8 @@
package net.corda.node.driver
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
@ -13,6 +15,7 @@ import net.corda.core.crypto.Party
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.ApiUtils
import net.corda.core.utilities.loggerFor
import net.corda.node.services.User
import net.corda.node.services.config.ConfigHelper
@ -23,7 +26,9 @@ import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.JsonSupport
import net.corda.node.utilities.ServiceIdentityGenerator
import okhttp3.OkHttpClient
import org.slf4j.Logger
import java.io.File
import java.net.*
@ -35,13 +40,14 @@ import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.*
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import com.sun.corba.se.spi.presentation.rmi.StubAdapter.request
import org.bouncycastle.crypto.tls.ConnectionEnd.client
import okhttp3.Request
/**
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
@ -90,7 +96,7 @@ interface DriverDSLExposedInterface {
*
* @param handle The handle for the node that this webserver connects to via RPC.
*/
fun startWebserver(handle: NodeHandle): Future<HostAndPort>
fun startWebserver(handle: NodeHandle): ListenableFuture<HostAndPort>
fun waitForAllNodesToFinish()
}
@ -334,7 +340,7 @@ open class DriverDSL(
val rpcOps = client.proxy(timeout = Duration.of(15, ChronoUnit.SECONDS))
return rpcOps.nodeIdentity()
} catch(e: Exception) {
log.error("Retrying query node info at $nodeAddress")
log.debug("Retrying query node info at $nodeAddress")
retries++
}
@ -416,12 +422,33 @@ open class DriverDSL(
}
}
override fun startWebserver(handle: NodeHandle): Future<HostAndPort> {
private fun queryWebserver(configuration: FullNodeConfiguration): HostAndPort? {
val protocol = if(configuration.useHTTPS) { "https://" } else { "http://" }
val url = URL(protocol + configuration.webAddress.toString() + "/api/status")
val client = OkHttpClient.Builder().connectTimeout(5, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build()
val retries = 5
for(i in 0..retries) {
try {
val response = client.newCall(Request.Builder().url(url).build()).execute()
if (response.isSuccessful && (response.body().string() == "started")) {
return configuration.webAddress
}
} catch(e: ConnectException) {
log.debug("Retrying webserver info at ${ configuration.webAddress}")
}
}
log.error("Could not query node info after $retries retries")
return null
}
override fun startWebserver(handle: NodeHandle): ListenableFuture<HostAndPort> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
return future {
registerProcess(DriverDSL.startWebserver(executorService, handle.configuration, debugPort))
handle.configuration.webAddress
queryWebserver(handle.configuration)!!
}
}

View File

@ -11,11 +11,12 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import net.corda.core.contracts.BusinessCalendar
import net.corda.core.crypto.*
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.i2p.crypto.eddsa.EdDSAPublicKey
import net.corda.core.node.services.IdentityService
import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
@ -25,7 +26,21 @@ import java.time.LocalDateTime
* the java.time API, some core types, and Kotlin data classes.
*/
object JsonSupport {
val javaTimeModule : Module by lazy {
interface PartyObjectMapper {
fun partyFromName(partyName: String): Party?
}
class RpcObjectMapper(val rpc: CordaRPCOps) : PartyObjectMapper, ObjectMapper() {
override fun partyFromName(partyName: String): Party? = rpc.partyFromName(partyName)
}
class IdentityObjectMapper(val identityService: IdentityService) : PartyObjectMapper, ObjectMapper(){
override fun partyFromName(partyName: String) = identityService.partyFromName(partyName)
}
class NoPartyObjectMapper: PartyObjectMapper, ObjectMapper() {
override fun partyFromName(partyName: String) = throw UnsupportedOperationException()
}
val javaTimeModule: Module by lazy {
SimpleModule("java.time").apply {
addSerializer(LocalDate::class.java, ToStringSerializer)
addDeserializer(LocalDate::class.java, LocalDateDeserializer)
@ -34,7 +49,7 @@ object JsonSupport {
}
}
val cordaModule : Module by lazy {
val cordaModule: Module by lazy {
SimpleModule("core").apply {
addSerializer(Party::class.java, PartySerializer)
addDeserializer(Party::class.java, PartyDeserializer)
@ -61,18 +76,24 @@ object JsonSupport {
}
}
fun createDefaultMapper(identities: IdentityService): ObjectMapper =
ServiceHubObjectMapper(identities).apply {
enable(SerializationFeature.INDENT_OUTPUT)
enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
/* Mapper requiring RPC support to deserialise parties from names */
fun createDefaultMapper(rpc: CordaRPCOps): ObjectMapper = configureMapper(RpcObjectMapper(rpc))
registerModule(javaTimeModule)
registerModule(cordaModule)
registerModule(KotlinModule())
}
/* For testing or situations where deserialising parties is not required */
fun createNonRpcMapper(): ObjectMapper = configureMapper(NoPartyObjectMapper())
class ServiceHubObjectMapper(val identities: IdentityService) : ObjectMapper()
/* For testing with an in memory identity service */
fun createInMemoryMapper(identityService: IdentityService) = configureMapper(IdentityObjectMapper(identityService))
private fun configureMapper(mapper: ObjectMapper): ObjectMapper = mapper.apply {
enable(SerializationFeature.INDENT_OUTPUT)
enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
registerModule(javaTimeModule)
registerModule(cordaModule)
registerModule(KotlinModule())
}
object ToStringSerializer : JsonSerializer<Any>() {
override fun serialize(obj: Any, generator: JsonGenerator, provider: SerializerProvider) {
@ -108,9 +129,10 @@ object JsonSupport {
if (parser.currentToken == JsonToken.FIELD_NAME) {
parser.nextToken()
}
val mapper = parser.codec as ServiceHubObjectMapper
val mapper = parser.codec as PartyObjectMapper
// TODO this needs to use some industry identifier(s) not just these human readable names
return mapper.identities.partyFromName(parser.text) ?: throw JsonParseException(parser, "Could not find a Party with name: ${parser.text}")
return mapper.partyFromName(parser.text) ?: throw JsonParseException(parser, "Could not find a Party with name ${parser.text}")
}
}

View File

@ -8,7 +8,8 @@ fun main(args: Array<String>) {
System.setProperty("consoleLogLevel", "info")
driver {
val node = startNode().get()
val server = WebServer(node.configuration).start()
//WebServer(node.configuration).start() // Old in memory way
startWebserver(node)
waitForAllNodesToFinish()
}
}

View File

@ -6,9 +6,10 @@ import net.corda.core.utilities.loggerFor
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.servlets.AttachmentDownloadServlet
import net.corda.node.servlets.DataUploadServlet
import net.corda.node.servlets.ResponseFilter
import net.corda.node.webserver.servlets.AttachmentDownloadServlet
import net.corda.node.webserver.servlets.ObjectMapperConfig
import net.corda.node.webserver.servlets.DataUploadServlet
import net.corda.node.webserver.servlets.ResponseFilter
import net.corda.node.webserver.internal.APIServerImpl
import org.eclipse.jetty.server.*
import org.eclipse.jetty.server.handler.HandlerCollection
@ -32,7 +33,10 @@ class WebServer(val config: FullNodeConfiguration) {
val address = config.webAddress
fun start() {
initWebServer(connectLocalRpcAsNodeUser())
val server = initWebServer(connectLocalRpcAsNodeUser())
while(server.isRunning) {
Thread.sleep(100) // TODO: Redesign
}
}
private fun initWebServer(localRpc: CordaRPCOps): Server {
@ -90,6 +94,7 @@ class WebServer(val config: FullNodeConfiguration) {
server.handler = handlerCollection
//runOnStop += Runnable { server.stop() }
server.start()
println("Server started")
log.info("Embedded web server is listening on", "http://${InetAddress.getLocalHost().hostAddress}:${connector.port}/")
return server
}
@ -101,6 +106,7 @@ class WebServer(val config: FullNodeConfiguration) {
addServlet(AttachmentDownloadServlet::class.java, "/attachments/*")
val resourceConfig = ResourceConfig()
resourceConfig.register(ObjectMapperConfig(localRpc))
resourceConfig.register(ResponseFilter())
resourceConfig.register(APIServerImpl(localRpc))

View File

@ -1,4 +1,4 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.StorageService

View File

@ -1,4 +1,4 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.Node

View File

@ -1,6 +1,7 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import com.fasterxml.jackson.databind.ObjectMapper
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.ServiceHub
import net.corda.node.utilities.JsonSupport
import javax.ws.rs.ext.ContextResolver
@ -11,7 +12,7 @@ import javax.ws.rs.ext.Provider
* and to organise serializers / deserializers for java.time.* classes as necessary.
*/
@Provider
class Config(val services: ServiceHub) : ContextResolver<ObjectMapper> {
val defaultObjectMapper = JsonSupport.createDefaultMapper(services.identityService)
class ObjectMapperConfig(rpc: CordaRPCOps) : ContextResolver<ObjectMapper> {
val defaultObjectMapper = JsonSupport.createDefaultMapper(rpc)
override fun getContext(type: Class<*>) = defaultObjectMapper
}

View File

@ -1,4 +1,4 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import javax.ws.rs.container.ContainerRequestContext
import javax.ws.rs.container.ContainerResponseContext

View File

@ -15,7 +15,7 @@ import kotlin.test.assertEquals
class JsonSupportTest {
companion object {
val mapper = JsonSupport.createDefaultMapper(MockIdentityService(mutableListOf()))
val mapper = JsonSupport.createNonRpcMapper()
}
@Property

View File

@ -18,7 +18,7 @@ class BankOfCordaHttpAPITest {
startNode("BankOfCorda", setOf(ServiceInfo(SimpleNotaryService.type))),
startNode("BigCorporation")
).getOrThrow()
val nodeBankOfCordaApiAddr = nodeBankOfCorda.configuration.webAddress
val nodeBankOfCordaApiAddr = startWebserver(nodeBankOfCorda).getOrThrow()
assert(BankOfCordaClientApi(nodeBankOfCordaApiAddr).requestWebIssue(IssueRequestParams(1000, "USD", "BigCorporation", "1", "BankOfCorda")))
}, isDebug = true)
}

View File

@ -35,14 +35,20 @@ class IRSDemoTest : IntegrationTestCategory {
startNode("Bank B")
).getOrThrow()
val (controllerAddr, nodeAAddr, nodeBAddr) = Futures.allAsList(
startWebserver(controller),
startWebserver(nodeA),
startWebserver(nodeB)
).getOrThrow()
val nextFixingDates = getFixingDateObservable(nodeA.configuration)
runUploadRates(controller.configuration.webAddress)
runTrade(nodeA.configuration.webAddress)
runUploadRates(controllerAddr)
runTrade(nodeAAddr)
// Wait until the initial trade and all scheduled fixings up to the current date have finished
nextFixingDates.first { it == null || it > currentDate }
runDateChange(nodeB.configuration.webAddress)
runDateChange(nodeBAddr)
nextFixingDates.first { it == null || it > futureDate }
}
}
@ -78,4 +84,4 @@ class IRSDemoTest : IntegrationTestCategory {
val url = URL("http://$host/upload/interest-rates")
assert(uploadFile(url, fileContents))
}
}
}

View File

@ -11,6 +11,7 @@ import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flatMap
import net.corda.core.flows.FlowStateMachine
import net.corda.core.map
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.services.linearHeadsOfType
import net.corda.core.success
import net.corda.core.transactions.SignedTransaction
@ -18,6 +19,7 @@ import net.corda.flows.TwoPartyDealFlow.Acceptor
import net.corda.flows.TwoPartyDealFlow.AutoOffer
import net.corda.flows.TwoPartyDealFlow.Instigator
import net.corda.irs.contract.InterestRateSwap
import net.corda.node.internal.CordaRPCOpsImpl
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.initiateSingleShotFlow
import net.corda.testing.node.InMemoryMessagingNetwork
@ -30,7 +32,7 @@ import java.util.*
* A simulation in which banks execute interest rate swaps with each other, including the fixing events.
*/
class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(networkSendManuallyPumped, runAsync, latencyInjector) {
val om = net.corda.node.utilities.JsonSupport.createDefaultMapper(MockIdentityService(network.identities))
val om = net.corda.node.utilities.JsonSupport.createInMemoryMapper(MockIdentityService(network.identities))
init {
currentDateAndTime = LocalDate.of(2016, 3, 8).atStartOfDay()