mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
Integration test for http network map service (#2078)
* make node info file copying optional by setting "compatabilityZoneURL" in driver integration test for node using http network map using driver some bug fixes * rebase to feature branch and fixup * add initialRegistration flag to driver * remove useFileBaseNetworkMap flag, add network map server to DriverTest * remove useFileBaseNetworkMap flag, add network map server to DriverTest * use PortAllocation.Incremental instead of random * * use PortAllocation.Incremental instead of random * fix NodeInfoWatcher thread leak issue * reset scheduler before create notary * move port allocation out of companion object * move port allocation out of companion object * make node info file copier lateinit to avoid observable thread pool get created on init
This commit is contained in:
parent
9fefabbb88
commit
cc1fba641e
@ -0,0 +1,91 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.testing.ALICE
|
||||
import net.corda.testing.BOB
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.network.NetworkMapServer
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
import java.net.URL
|
||||
|
||||
class NetworkMapClientTest {
|
||||
private val portAllocation = PortAllocation.Incremental(10000)
|
||||
|
||||
@Test
|
||||
fun `nodes can see each other using the http network map`() {
|
||||
NetworkMapServer(1.minutes, portAllocation.nextHostAndPort()).use {
|
||||
val (host, port) = it.start()
|
||||
driver(portAllocation = portAllocation, compatibilityZoneURL = URL("http://$host:$port")) {
|
||||
val alice = startNode(providedName = ALICE.name)
|
||||
val bob = startNode(providedName = BOB.name)
|
||||
|
||||
val notaryNode = defaultNotaryNode.get()
|
||||
val aliceNode = alice.get()
|
||||
val bobNode = bob.get()
|
||||
|
||||
notaryNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
|
||||
aliceNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
|
||||
bobNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `nodes process network map add updates correctly when adding new node to network map`() {
|
||||
NetworkMapServer(1.seconds, portAllocation.nextHostAndPort()).use {
|
||||
val (host, port) = it.start()
|
||||
driver(portAllocation = portAllocation, compatibilityZoneURL = URL("http://$host:$port")) {
|
||||
val alice = startNode(providedName = ALICE.name)
|
||||
val notaryNode = defaultNotaryNode.get()
|
||||
val aliceNode = alice.get()
|
||||
|
||||
notaryNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo)
|
||||
aliceNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo)
|
||||
|
||||
val bob = startNode(providedName = BOB.name)
|
||||
val bobNode = bob.get()
|
||||
|
||||
// Wait for network map client to poll for the next update.
|
||||
Thread.sleep(2.seconds.toMillis())
|
||||
|
||||
bobNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
|
||||
notaryNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
|
||||
aliceNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `nodes process network map remove updates correctly`() {
|
||||
NetworkMapServer(1.seconds, portAllocation.nextHostAndPort()).use {
|
||||
val (host, port) = it.start()
|
||||
driver(portAllocation = portAllocation, compatibilityZoneURL = URL("http://$host:$port")) {
|
||||
val alice = startNode(providedName = ALICE.name)
|
||||
val bob = startNode(providedName = BOB.name)
|
||||
|
||||
val notaryNode = defaultNotaryNode.get()
|
||||
val aliceNode = alice.get()
|
||||
val bobNode = bob.get()
|
||||
|
||||
notaryNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
|
||||
aliceNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
|
||||
bobNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
|
||||
|
||||
it.removeNodeInfo(aliceNode.nodeInfo)
|
||||
|
||||
// Wait for network map client to poll for the next update.
|
||||
Thread.sleep(2.seconds.toMillis())
|
||||
|
||||
notaryNode.onlySees(notaryNode.nodeInfo, bobNode.nodeInfo)
|
||||
bobNode.onlySees(notaryNode.nodeInfo, bobNode.nodeInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun NodeHandle.onlySees(vararg nodes: NodeInfo) = assertThat(rpc.networkMapSnapshot()).containsOnly(*nodes)
|
||||
}
|
@ -184,11 +184,17 @@ open class Node(configuration: NodeConfiguration,
|
||||
return if (!AddressUtils.isPublic(host)) {
|
||||
val foundPublicIP = AddressUtils.tryDetectPublicIP()
|
||||
if (foundPublicIP == null) {
|
||||
val retrievedHostName = networkMapClient?.myPublicHostname()
|
||||
if (retrievedHostName != null) {
|
||||
log.info("Retrieved public IP from Network Map Service: $this. This will be used instead of the provided \"$host\" as the advertised address.")
|
||||
try {
|
||||
val retrievedHostName = networkMapClient?.myPublicHostname()
|
||||
if (retrievedHostName != null) {
|
||||
log.info("Retrieved public IP from Network Map Service: $this. This will be used instead of the provided \"$host\" as the advertised address.")
|
||||
}
|
||||
retrievedHostName
|
||||
} catch (ignore: Throwable) {
|
||||
// Cannot reach the network map service, ignore the exception and use provided P2P address instead.
|
||||
log.warn("Cannot connect to the network map service for public IP detection.")
|
||||
null
|
||||
}
|
||||
retrievedHostName
|
||||
} else {
|
||||
log.info("Detected public IP: ${foundPublicIP.hostAddress}. This will be used instead of the provided \"$host\" as the advertised address.")
|
||||
foundPublicIP.hostAddress
|
||||
|
@ -101,7 +101,7 @@ open class PersistentNetworkMapCache(
|
||||
select(get<String>(NodeInfoSchemaV1.PersistentNodeInfo::hash.name))
|
||||
}
|
||||
}
|
||||
session.createQuery(query).resultList.map { SecureHash.sha256(it) }
|
||||
session.createQuery(query).resultList.map { SecureHash.parse(it) }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,86 +1,43 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.network.TestNodeInfoFactory.createNodeInfo
|
||||
import net.corda.nodeapi.internal.crypto.CertificateType
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.node.network.NetworkMapServer
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.bouncycastle.cert.X509CertificateHolder
|
||||
import org.eclipse.jetty.server.Server
|
||||
import org.eclipse.jetty.server.ServerConnector
|
||||
import org.eclipse.jetty.server.handler.HandlerCollection
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler
|
||||
import org.eclipse.jetty.servlet.ServletHolder
|
||||
import org.glassfish.jersey.server.ResourceConfig
|
||||
import org.glassfish.jersey.servlet.ServletContainer
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.InputStream
|
||||
import java.net.InetSocketAddress
|
||||
import java.net.URL
|
||||
import java.security.cert.CertPath
|
||||
import java.security.cert.Certificate
|
||||
import java.security.cert.CertificateFactory
|
||||
import java.security.cert.X509Certificate
|
||||
import javax.ws.rs.*
|
||||
import javax.ws.rs.core.MediaType
|
||||
import javax.ws.rs.core.Response
|
||||
import javax.ws.rs.core.Response.ok
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NetworkMapClientTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
private lateinit var server: Server
|
||||
|
||||
private lateinit var server: NetworkMapServer
|
||||
private lateinit var networkMapClient: NetworkMapClient
|
||||
private val rootCAKey = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
|
||||
private val rootCACert = X509Utilities.createSelfSignedCACertificate(CordaX500Name(commonName = "Corda Node Root CA", organisation = "R3 LTD", locality = "London", country = "GB"), rootCAKey)
|
||||
private val intermediateCAKey = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
|
||||
private val intermediateCACert = X509Utilities.createCertificate(CertificateType.INTERMEDIATE_CA, rootCACert, rootCAKey, X500Name("CN=Corda Node Intermediate CA,L=London"), intermediateCAKey.public)
|
||||
|
||||
companion object {
|
||||
private val cacheTimeout = 100000.seconds
|
||||
}
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
server = Server(InetSocketAddress("localhost", 0)).apply {
|
||||
handler = HandlerCollection().apply {
|
||||
addHandler(ServletContextHandler().apply {
|
||||
contextPath = "/"
|
||||
val resourceConfig = ResourceConfig().apply {
|
||||
// Add your API provider classes (annotated for JAX-RS) here
|
||||
register(MockNetworkMapServer())
|
||||
}
|
||||
val jerseyServlet = ServletHolder(ServletContainer(resourceConfig)).apply { initOrder = 0 }// Initialise at server start
|
||||
addServlet(jerseyServlet, "/*")
|
||||
})
|
||||
}
|
||||
}
|
||||
server.start()
|
||||
|
||||
while (!server.isStarted) {
|
||||
Thread.sleep(100)
|
||||
}
|
||||
|
||||
val hostAndPort = server.connectors.mapNotNull { it as? ServerConnector }.first()
|
||||
networkMapClient = NetworkMapClient(URL("http://${hostAndPort.host}:${hostAndPort.localPort}"))
|
||||
server = NetworkMapServer(cacheTimeout, PortAllocation.Incremental(10000).nextHostAndPort())
|
||||
val hostAndPort = server.start()
|
||||
networkMapClient = NetworkMapClient(URL("http://${hostAndPort.host}:${hostAndPort.port}"))
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
server.stop()
|
||||
server.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -102,7 +59,7 @@ class NetworkMapClientTest {
|
||||
|
||||
val nodeInfoHash2 = nodeInfo2.serialize().sha256()
|
||||
assertThat(networkMapClient.getNetworkMap().networkMap).containsExactly(nodeInfoHash, nodeInfoHash2)
|
||||
assertEquals(100000.seconds, networkMapClient.getNetworkMap().cacheMaxAge)
|
||||
assertEquals(cacheTimeout, networkMapClient.getNetworkMap().cacheMaxAge)
|
||||
assertEquals(nodeInfo2, networkMapClient.getNodeInfo(nodeInfoHash2))
|
||||
}
|
||||
|
||||
@ -111,43 +68,3 @@ class NetworkMapClientTest {
|
||||
assertEquals("test.host.name", networkMapClient.myPublicHostname())
|
||||
}
|
||||
}
|
||||
|
||||
@Path("network-map")
|
||||
// This is a stub implementation of the network map rest API.
|
||||
internal class MockNetworkMapServer {
|
||||
val nodeInfoMap = mutableMapOf<SecureHash, NodeInfo>()
|
||||
@POST
|
||||
@Path("publish")
|
||||
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
|
||||
fun publishNodeInfo(input: InputStream): Response {
|
||||
val registrationData = input.readBytes().deserialize<SignedData<NodeInfo>>()
|
||||
val nodeInfo = registrationData.verified()
|
||||
val nodeInfoHash = nodeInfo.serialize().sha256()
|
||||
nodeInfoMap.put(nodeInfoHash, nodeInfo)
|
||||
return ok().build()
|
||||
}
|
||||
|
||||
@GET
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
fun getNetworkMap(): Response {
|
||||
return Response.ok(ObjectMapper().writeValueAsString(nodeInfoMap.keys.map { it.toString() })).header("Cache-Control", "max-age=100000").build()
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("{var}")
|
||||
@Produces(MediaType.APPLICATION_OCTET_STREAM)
|
||||
fun getNodeInfo(@PathParam("var") nodeInfoHash: String): Response {
|
||||
val nodeInfo = nodeInfoMap[SecureHash.parse(nodeInfoHash)]
|
||||
return if (nodeInfo != null) {
|
||||
Response.ok(nodeInfo.serialize().bytes)
|
||||
} else {
|
||||
Response.status(Response.Status.NOT_FOUND)
|
||||
}.build()
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("my-hostname")
|
||||
fun getHostName(): Response {
|
||||
return Response.ok("test.host.name").build()
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,17 @@ dependencies {
|
||||
// Integration test helpers
|
||||
integrationTestCompile "org.assertj:assertj-core:${assertj_version}"
|
||||
integrationTestCompile "junit:junit:$junit_version"
|
||||
|
||||
// Jetty dependencies for NetworkMapClient test.
|
||||
// Web stuff: for HTTP[S] servlets
|
||||
compile "org.eclipse.jetty:jetty-servlet:${jetty_version}"
|
||||
compile "org.eclipse.jetty:jetty-webapp:${jetty_version}"
|
||||
compile "javax.servlet:javax.servlet-api:3.1.0"
|
||||
|
||||
// Jersey for JAX-RS implementation for use in Jetty
|
||||
compile "org.glassfish.jersey.core:jersey-server:${jersey_version}"
|
||||
compile "org.glassfish.jersey.containers:jersey-container-servlet-core:${jersey_version}"
|
||||
compile "org.glassfish.jersey.containers:jersey-container-jetty-http:${jersey_version}"
|
||||
}
|
||||
|
||||
task integrationTest(type: Test) {
|
||||
|
@ -1,25 +1,28 @@
|
||||
package net.corda.testing.driver
|
||||
|
||||
import com.sun.net.httpserver.HttpExchange
|
||||
import com.sun.net.httpserver.HttpHandler
|
||||
import com.sun.net.httpserver.HttpServer
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.list
|
||||
import net.corda.core.internal.readLines
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.testing.DUMMY_BANK_A
|
||||
import net.corda.testing.DUMMY_NOTARY
|
||||
import net.corda.testing.DUMMY_REGULATOR
|
||||
import net.corda.testing.ProjectStructure.projectRootDir
|
||||
import net.corda.testing.node.NotarySpec
|
||||
import net.corda.testing.node.network.NetworkMapServer
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
import java.net.InetSocketAddress
|
||||
import java.net.URL
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
import javax.ws.rs.GET
|
||||
import javax.ws.rs.POST
|
||||
import javax.ws.rs.Path
|
||||
import javax.ws.rs.core.Response
|
||||
import javax.ws.rs.core.Response.ok
|
||||
|
||||
class DriverTests {
|
||||
companion object {
|
||||
@ -37,6 +40,7 @@ class DriverTests {
|
||||
addressMustNotBeBound(executorService, hostAndPort)
|
||||
}
|
||||
}
|
||||
private val portAllocation = PortAllocation.Incremental(10000)
|
||||
|
||||
@Test
|
||||
fun `simple node startup and shutdown`() {
|
||||
@ -49,7 +53,7 @@ class DriverTests {
|
||||
|
||||
@Test
|
||||
fun `random free port allocation`() {
|
||||
val nodeHandle = driver(portAllocation = PortAllocation.RandomFree) {
|
||||
val nodeHandle = driver(portAllocation = portAllocation) {
|
||||
val nodeInfo = startNode(providedName = DUMMY_BANK_A.name)
|
||||
nodeMustBeUp(nodeInfo)
|
||||
}
|
||||
@ -58,33 +62,14 @@ class DriverTests {
|
||||
|
||||
@Test
|
||||
fun `node registration`() {
|
||||
// Very simple Http handler which counts the requests it has received and always returns the same payload.
|
||||
val handler = object : HttpHandler {
|
||||
private val _requests = mutableListOf<String>()
|
||||
val requests: List<String>
|
||||
get() = _requests.toList()
|
||||
|
||||
override fun handle(exchange: HttpExchange) {
|
||||
val response = "reply"
|
||||
_requests.add(exchange.requestURI.toString())
|
||||
exchange.responseHeaders.set("Content-Type", "text/html; charset=" + Charsets.UTF_8)
|
||||
exchange.sendResponseHeaders(200, response.length.toLong())
|
||||
exchange.responseBody.use { it.write(response.toByteArray()) }
|
||||
val handler = RegistrationHandler()
|
||||
NetworkMapServer(1.minutes, portAllocation.nextHostAndPort(), handler).use {
|
||||
val (host, port) = it.start()
|
||||
driver(portAllocation = portAllocation, compatibilityZoneURL = URL("http://$host:$port")) {
|
||||
// Wait for the node to have started.
|
||||
startNode(initialRegistration = true).get()
|
||||
}
|
||||
}
|
||||
|
||||
val inetSocketAddress = InetSocketAddress(0)
|
||||
val server = HttpServer.create(inetSocketAddress, 0)
|
||||
val port = server.address.port
|
||||
server.createContext("/", handler)
|
||||
server.executor = null // creates a default executor
|
||||
server.start()
|
||||
|
||||
driver(compatibilityZoneURL = URL("http://localhost:$port")) {
|
||||
// Wait for the notary to have started.
|
||||
notaryHandles.first().nodeHandles.get()
|
||||
}
|
||||
|
||||
// We're getting:
|
||||
// a request to sign the certificate then
|
||||
// at least one poll request to see if the request has been approved.
|
||||
@ -120,3 +105,20 @@ class DriverTests {
|
||||
assertThat(baseDirectory / "process-id").doesNotExist()
|
||||
}
|
||||
}
|
||||
|
||||
@Path("certificate")
|
||||
class RegistrationHandler {
|
||||
val requests = mutableListOf<String>()
|
||||
@POST
|
||||
fun registration(): Response {
|
||||
requests += "/certificate"
|
||||
return ok("reply").build()
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("reply")
|
||||
fun reply(): Response {
|
||||
requests += "/certificate/reply"
|
||||
return ok().build()
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.concurrent.*
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
@ -48,6 +47,7 @@ import okhttp3.Request
|
||||
import org.slf4j.Logger
|
||||
import rx.Observable
|
||||
import rx.observables.ConnectableObservable
|
||||
import rx.schedulers.Schedulers
|
||||
import java.net.*
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
@ -111,13 +111,14 @@ interface DriverDSLExposedInterface : CordformContext {
|
||||
* Returns the [NotaryHandle] for the single notary on the network. Throws if there are none or more than one.
|
||||
* @see notaryHandles
|
||||
*/
|
||||
val defaultNotaryHandle: NotaryHandle get() {
|
||||
return when (notaryHandles.size) {
|
||||
0 -> throw IllegalStateException("There are no notaries defined on the network")
|
||||
1 -> notaryHandles[0]
|
||||
else -> throw IllegalStateException("There is more than one notary defined on the network")
|
||||
val defaultNotaryHandle: NotaryHandle
|
||||
get() {
|
||||
return when (notaryHandles.size) {
|
||||
0 -> throw IllegalStateException("There are no notaries defined on the network")
|
||||
1 -> notaryHandles[0]
|
||||
else -> throw IllegalStateException("There is more than one notary defined on the network")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the identity of the single notary on the network. Throws if there are none or more than one.
|
||||
@ -131,11 +132,12 @@ interface DriverDSLExposedInterface : CordformContext {
|
||||
* @see defaultNotaryHandle
|
||||
* @see notaryHandles
|
||||
*/
|
||||
val defaultNotaryNode: CordaFuture<NodeHandle> get() {
|
||||
return defaultNotaryHandle.nodeHandles.map {
|
||||
it.singleOrNull() ?: throw IllegalStateException("Default notary is not a single node")
|
||||
val defaultNotaryNode: CordaFuture<NodeHandle>
|
||||
get() {
|
||||
return defaultNotaryHandle.nodeHandles.map {
|
||||
it.singleOrNull() ?: throw IllegalStateException("Default notary is not a single node")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a node.
|
||||
@ -157,7 +159,9 @@ interface DriverDSLExposedInterface : CordformContext {
|
||||
verifierType: VerifierType = defaultParameters.verifierType,
|
||||
customOverrides: Map<String, Any?> = defaultParameters.customOverrides,
|
||||
startInSameProcess: Boolean? = defaultParameters.startInSameProcess,
|
||||
maximumHeapSize: String = defaultParameters.maximumHeapSize): CordaFuture<NodeHandle>
|
||||
maximumHeapSize: String = defaultParameters.maximumHeapSize,
|
||||
initialRegistration: Boolean = defaultParameters.initialRegistration): CordaFuture<NodeHandle>
|
||||
|
||||
|
||||
/**
|
||||
* Helper function for starting a [Node] with custom parameters from Java.
|
||||
@ -297,7 +301,8 @@ data class NodeParameters(
|
||||
val verifierType: VerifierType = VerifierType.InMemory,
|
||||
val customOverrides: Map<String, Any?> = emptyMap(),
|
||||
val startInSameProcess: Boolean? = null,
|
||||
val maximumHeapSize: String = "200m"
|
||||
val maximumHeapSize: String = "200m",
|
||||
val initialRegistration: Boolean = false
|
||||
) {
|
||||
fun setProvidedName(providedName: CordaX500Name?) = copy(providedName = providedName)
|
||||
fun setRpcUsers(rpcUsers: List<User>) = copy(rpcUsers = rpcUsers)
|
||||
@ -332,8 +337,9 @@ data class NodeParameters(
|
||||
* @param useTestClock If true the test clock will be used in Node.
|
||||
* @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or
|
||||
* not. Note that this may be overridden in [DriverDSLExposedInterface.startNode].
|
||||
* @param notarySpecs The notaries advertised for this network. These nodes will be started
|
||||
automatically and will be* available from [DriverDSLExposedInterface.notaryHandles]. Defaults to a simple validating notary.* @param compatibilityZoneURL if not null each node is started once in registration mode (which makes the node register and quit),
|
||||
* @param notarySpecs The notaries advertised for this network. These nodes will be started automatically and will be
|
||||
* available from [DriverDSLExposedInterface.notaryHandles]. Defaults to a simple validating notary.
|
||||
* @param compatibilityZoneURL if not null each node is started once in registration mode (which makes the node register and quit),
|
||||
* and then re-starts the node with the given parameters.
|
||||
* @param dsl The dsl itself.
|
||||
* @return The value returned in the [dsl] closure.
|
||||
@ -601,7 +607,8 @@ class DriverDSL(
|
||||
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
|
||||
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
|
||||
// Investigate whether we can avoid that.
|
||||
private val nodeInfoFilesCopier = NodeInfoFilesCopier()
|
||||
// TODO: NodeInfoFilesCopier create observable threads in the init method, we should move that to a start method instead, changing this to lateinit instead to prevent that.
|
||||
private lateinit var nodeInfoFilesCopier: NodeInfoFilesCopier
|
||||
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
|
||||
private val countObservables = mutableMapOf<CordaX500Name, Observable<Int>>()
|
||||
private lateinit var _notaries: List<NotaryHandle>
|
||||
@ -662,30 +669,42 @@ class DriverDSL(
|
||||
verifierType: VerifierType,
|
||||
customOverrides: Map<String, Any?>,
|
||||
startInSameProcess: Boolean?,
|
||||
maximumHeapSize: String
|
||||
maximumHeapSize: String,
|
||||
initialRegistration: Boolean
|
||||
): CordaFuture<NodeHandle> {
|
||||
val p2pAddress = portAllocation.nextHostAndPort()
|
||||
// TODO: Derive name from the full picked name, don't just wrap the common name
|
||||
val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB")
|
||||
val registrationFuture = compatibilityZoneURL?.let { registerNode(name, it) } ?: doneFuture(Unit)
|
||||
|
||||
val registrationFuture = if (initialRegistration) {
|
||||
compatibilityZoneURL ?: throw IllegalArgumentException("Compatibility zone URL must be provided for initial registration.")
|
||||
registerNode(name, compatibilityZoneURL)
|
||||
} else {
|
||||
doneFuture(Unit)
|
||||
}
|
||||
|
||||
return registrationFuture.flatMap {
|
||||
val rpcAddress = portAllocation.nextHostAndPort()
|
||||
val webAddress = portAllocation.nextHostAndPort()
|
||||
val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) }
|
||||
val configMap = configOf(
|
||||
"myLegalName" to name.toString(),
|
||||
"p2pAddress" to p2pAddress.toString(),
|
||||
"rpcAddress" to rpcAddress.toString(),
|
||||
"webAddress" to webAddress.toString(),
|
||||
"useTestClock" to useTestClock,
|
||||
"rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
|
||||
"verifierType" to verifierType.name
|
||||
) + customOverrides
|
||||
val config = ConfigHelper.loadConfig(
|
||||
baseDirectory = baseDirectory(name),
|
||||
allowMissingConfig = true,
|
||||
configOverrides = configOf(
|
||||
"myLegalName" to name.toString(),
|
||||
"p2pAddress" to p2pAddress.toString(),
|
||||
"rpcAddress" to rpcAddress.toString(),
|
||||
"webAddress" to webAddress.toString(),
|
||||
"useTestClock" to useTestClock,
|
||||
"rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
|
||||
"verifierType" to verifierType.name
|
||||
) + customOverrides
|
||||
configOverrides = if (compatibilityZoneURL != null) {
|
||||
configMap + mapOf("compatibilityZoneURL" to compatibilityZoneURL.toString())
|
||||
} else {
|
||||
configMap
|
||||
}
|
||||
)
|
||||
|
||||
startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
|
||||
}
|
||||
}
|
||||
@ -756,9 +775,15 @@ class DriverDSL(
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
if (startNodesInProcess) {
|
||||
Schedulers.reset()
|
||||
}
|
||||
_executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build())
|
||||
_shutdownManager = ShutdownManager(executorService)
|
||||
|
||||
nodeInfoFilesCopier = NodeInfoFilesCopier()
|
||||
shutdownManager.registerShutdown { nodeInfoFilesCopier.close() }
|
||||
|
||||
val notaryInfos = generateNotaryIdentities()
|
||||
// The network parameters must be serialised before starting any of the nodes
|
||||
networkParameters = NetworkParametersCopier(testNetworkParameters(notaryInfos))
|
||||
@ -923,10 +948,14 @@ class DriverDSL(
|
||||
maximumHeapSize: String): CordaFuture<NodeHandle> {
|
||||
val configuration = config.parseAsNodeConfiguration()
|
||||
val baseDirectory = configuration.baseDirectory.createDirectories()
|
||||
// Distribute node info file using file copier when network map service URL (compatibilityZoneURL) is null.
|
||||
// TODO: need to implement the same in cordformation?
|
||||
val nodeInfoFilesCopier = if (compatibilityZoneURL == null) nodeInfoFilesCopier else null
|
||||
|
||||
nodeInfoFilesCopier?.addConfig(baseDirectory)
|
||||
networkParameters.install(baseDirectory)
|
||||
nodeInfoFilesCopier.addConfig(baseDirectory)
|
||||
val onNodeExit: () -> Unit = {
|
||||
nodeInfoFilesCopier.removeConfig(baseDirectory)
|
||||
nodeInfoFilesCopier?.removeConfig(baseDirectory)
|
||||
countObservables.remove(configuration.myLegalName)
|
||||
}
|
||||
if (startInProcess ?: startNodesInProcess) {
|
||||
@ -1017,8 +1046,8 @@ class DriverDSL(
|
||||
node.internals.run()
|
||||
}
|
||||
node to nodeThread
|
||||
}.flatMap {
|
||||
nodeAndThread -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread }
|
||||
}.flatMap { nodeAndThread ->
|
||||
addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread }
|
||||
}
|
||||
}
|
||||
|
||||
@ -1063,7 +1092,7 @@ class DriverDSL(
|
||||
}
|
||||
}.toList()
|
||||
|
||||
return ProcessUtilities.startCordaProcess(
|
||||
return ProcessUtilities.startCordaProcess(
|
||||
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
|
||||
arguments = arguments,
|
||||
jdwpPort = debugPort,
|
||||
|
@ -266,6 +266,7 @@ private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityMan
|
||||
override fun validateUser(user: String?, password: String?, remotingConnection: RemotingConnection?): String? {
|
||||
return validate(user, password)
|
||||
}
|
||||
|
||||
override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet<Role>?, checkType: CheckType?, address: String?, connection: RemotingConnection?): String? {
|
||||
return validate(user, password)
|
||||
}
|
||||
|
@ -0,0 +1,115 @@
|
||||
package net.corda.testing.node.network
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import org.eclipse.jetty.server.Server
|
||||
import org.eclipse.jetty.server.ServerConnector
|
||||
import org.eclipse.jetty.server.handler.HandlerCollection
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler
|
||||
import org.eclipse.jetty.servlet.ServletHolder
|
||||
import org.glassfish.jersey.server.ResourceConfig
|
||||
import org.glassfish.jersey.servlet.ServletContainer
|
||||
import java.io.Closeable
|
||||
import java.io.InputStream
|
||||
import java.net.InetSocketAddress
|
||||
import java.time.Duration
|
||||
import javax.ws.rs.*
|
||||
import javax.ws.rs.core.MediaType
|
||||
import javax.ws.rs.core.Response
|
||||
import javax.ws.rs.core.Response.ok
|
||||
|
||||
class NetworkMapServer(cacheTimeout: Duration, hostAndPort: NetworkHostAndPort, vararg additionalServices: Any) : Closeable {
|
||||
private val server: Server
|
||||
|
||||
private val service = InMemoryNetworkMapService(cacheTimeout)
|
||||
|
||||
init {
|
||||
server = Server(InetSocketAddress(hostAndPort.host, hostAndPort.port)).apply {
|
||||
handler = HandlerCollection().apply {
|
||||
addHandler(ServletContextHandler().apply {
|
||||
contextPath = "/"
|
||||
val resourceConfig = ResourceConfig().apply {
|
||||
// Add your API provider classes (annotated for JAX-RS) here
|
||||
register(service)
|
||||
additionalServices.forEach { register(it) }
|
||||
}
|
||||
val jerseyServlet = ServletHolder(ServletContainer(resourceConfig)).apply { initOrder = 0 }// Initialise at server start
|
||||
addServlet(jerseyServlet, "/*")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fun start(): NetworkHostAndPort {
|
||||
server.start()
|
||||
// Wait until server is up to obtain the host and port.
|
||||
while (!server.isStarted) {
|
||||
Thread.sleep(500)
|
||||
}
|
||||
return server.connectors
|
||||
.mapNotNull { it as? ServerConnector }
|
||||
.first()
|
||||
.let { NetworkHostAndPort(it.host, it.localPort) }
|
||||
}
|
||||
|
||||
fun removeNodeInfo(nodeInfo: NodeInfo) {
|
||||
service.removeNodeInfo(nodeInfo)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
server.stop()
|
||||
}
|
||||
|
||||
@Path("network-map")
|
||||
class InMemoryNetworkMapService(private val cacheTimeout: Duration) {
|
||||
private val nodeInfoMap = mutableMapOf<SecureHash, NodeInfo>()
|
||||
@POST
|
||||
@Path("publish")
|
||||
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
|
||||
fun publishNodeInfo(input: InputStream): Response {
|
||||
val registrationData = input.readBytes().deserialize<SignedData<NodeInfo>>()
|
||||
val nodeInfo = registrationData.verified()
|
||||
val nodeInfoHash = nodeInfo.serialize().sha256()
|
||||
nodeInfoMap.put(nodeInfoHash, nodeInfo)
|
||||
return ok().build()
|
||||
}
|
||||
|
||||
@GET
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
fun getNetworkMap(): Response {
|
||||
return Response.ok(ObjectMapper().writeValueAsString(nodeInfoMap.keys.map { it.toString() }))
|
||||
.header("Cache-Control", "max-age=${cacheTimeout.seconds}")
|
||||
.build()
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("{var}")
|
||||
@Produces(MediaType.APPLICATION_OCTET_STREAM)
|
||||
fun getNodeInfo(@PathParam("var") nodeInfoHash: String): Response {
|
||||
val nodeInfo = nodeInfoMap[SecureHash.parse(nodeInfoHash)]
|
||||
return if (nodeInfo != null) {
|
||||
Response.ok(nodeInfo.serialize().bytes)
|
||||
} else {
|
||||
Response.status(Response.Status.NOT_FOUND)
|
||||
}.build()
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("my-hostname")
|
||||
fun getHostName(): Response {
|
||||
return Response.ok("test.host.name").build()
|
||||
}
|
||||
|
||||
// Remove nodeInfo for testing.
|
||||
fun removeNodeInfo(nodeInfo: NodeInfo) {
|
||||
nodeInfoMap.remove(nodeInfo.serialize().hash)
|
||||
}
|
||||
}
|
||||
}
|
@ -16,12 +16,12 @@ import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.config.configureDevKeyAndTrustStores
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
import net.corda.nodeapi.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.testing.driver.*
|
||||
import net.corda.testing.internal.ProcessUtilities
|
||||
import net.corda.testing.node.NotarySpec
|
||||
|
Loading…
Reference in New Issue
Block a user