diff --git a/.gitignore b/.gitignore
index f3ae176bcc..68b8472e9b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,6 +32,7 @@ lib/dokka.jar
.idea/libraries
.idea/shelf
.idea/dataSources
+/gradle-plugins/.idea
# Include the -parameters compiler option by default in IntelliJ required for serialization.
!.idea/compiler.xml
@@ -53,6 +54,7 @@ lib/dokka.jar
# Gradle:
# .idea/gradle.xml
# .idea/libraries
+/gradle-plugins/gradle*
# Mongo Explorer plugin:
# .idea/mongoSettings.xml
@@ -65,6 +67,7 @@ lib/dokka.jar
# IntelliJ
/out/
+/classes/
# mpeltonen/sbt-idea plugin
.idea_modules/
diff --git a/.idea/compiler.xml b/.idea/compiler.xml
index d639c80f07..acd527bc35 100644
--- a/.idea/compiler.xml
+++ b/.idea/compiler.xml
@@ -17,6 +17,8 @@
+
+
@@ -59,10 +61,13 @@
+
+
+
diff --git a/build.gradle b/build.gradle
index bf86531ff6..54bc126c8b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -16,6 +16,11 @@ buildscript {
// TODO: Sort this alphabetically.
ext.kotlin_version = constants.getProperty("kotlinVersion")
ext.quasar_version = '0.7.6' // TODO: Upgrade to 0.7.7+ when Quasar bug 238 is resolved.
+
+ // gradle-capsule-plugin:1.0.2 contains capsule:1.0.1
+ // TODO: Upgrade gradle-capsule-plugin to a version with capsule:1.0.3
+ ext.capsule_version = '1.0.1'
+
ext.asm_version = '0.5.3'
ext.artemis_version = '1.5.3'
ext.jackson_version = '2.8.5'
@@ -41,6 +46,7 @@ buildscript {
ext.rxjava_version = '1.2.4'
ext.requery_version = '1.2.1'
ext.dokka_version = '0.9.13'
+ ext.eddsa_version = '0.2.0'
// Update 121 is required for ObjectInputFilter and at time of writing 131 was latest:
ext.java8_minUpdateVersion = '131'
@@ -60,12 +66,14 @@ buildscript {
classpath "org.jetbrains.kotlin:kotlin-noarg:$kotlin_version"
classpath "org.jetbrains.dokka:dokka-gradle-plugin:${dokka_version}"
classpath "org.ajoberstar:grgit:1.1.0"
+ classpath "net.i2p.crypto:eddsa:$eddsa_version" // Needed for ServiceIdentityGenerator in the build environment.
}
}
plugins {
// TODO The capsule plugin requires the newer DSL plugin block.It would be nice if we could unify all the plugins into one style,
// but the DSL has some restrictions e.g can't be used on the allprojects section. So we should revisit this if there are improvements in Gradle.
+ // Version 1.0.2 of this plugin uses capsule:1.0.1
id "us.kirchmeier.capsule" version "1.0.2"
}
@@ -249,7 +257,7 @@ bintrayConfig {
projectUrl = 'https://github.com/corda/corda'
gpgSign = true
gpgPassphrase = System.getenv('CORDA_BINTRAY_GPG_PASSPHRASE')
- publications = ['jfx', 'mock', 'rpc', 'core', 'corda', 'corda-webserver', 'finance', 'node', 'node-api', 'node-schemas', 'test-utils', 'jackson', 'verifier', 'webserver']
+ publications = ['jfx', 'mock', 'rpc', 'core', 'corda', 'cordform-common', 'corda-webserver', 'finance', 'node', 'node-api', 'node-schemas', 'test-utils', 'jackson', 'verifier', 'webserver']
license {
name = 'Apache-2.0'
url = 'https://www.apache.org/licenses/LICENSE-2.0'
diff --git a/client/jackson/src/main/kotlin/net/corda/jackson/JacksonSupport.kt b/client/jackson/src/main/kotlin/net/corda/jackson/JacksonSupport.kt
index 361313dbe3..100fc2f2f7 100644
--- a/client/jackson/src/main/kotlin/net/corda/jackson/JacksonSupport.kt
+++ b/client/jackson/src/main/kotlin/net/corda/jackson/JacksonSupport.kt
@@ -10,6 +10,7 @@ import com.fasterxml.jackson.module.kotlin.KotlinModule
import net.corda.core.contracts.Amount
import net.corda.core.contracts.BusinessCalendar
import net.corda.core.crypto.*
+import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
@@ -43,18 +44,21 @@ object JacksonSupport {
}
class RpcObjectMapper(val rpc: CordaRPCOps, factory: JsonFactory) : PartyObjectMapper, ObjectMapper(factory) {
+ @Suppress("OverridingDeprecatedMember", "DEPRECATION")
override fun partyFromName(partyName: String): Party? = rpc.partyFromName(partyName)
override fun partyFromPrincipal(principal: X500Name): Party? = rpc.partyFromX500Name(principal)
override fun partyFromKey(owningKey: PublicKey): Party? = rpc.partyFromKey(owningKey)
}
class IdentityObjectMapper(val identityService: IdentityService, factory: JsonFactory) : PartyObjectMapper, ObjectMapper(factory) {
+ @Suppress("OverridingDeprecatedMember", "DEPRECATION")
override fun partyFromName(partyName: String): Party? = identityService.partyFromName(partyName)
override fun partyFromPrincipal(principal: X500Name): Party? = identityService.partyFromX500Name(principal)
override fun partyFromKey(owningKey: PublicKey): Party? = identityService.partyFromKey(owningKey)
}
class NoPartyObjectMapper(factory: JsonFactory) : PartyObjectMapper, ObjectMapper(factory) {
+ @Suppress("OverridingDeprecatedMember", "DEPRECATION")
override fun partyFromName(partyName: String): Party? = throw UnsupportedOperationException()
override fun partyFromPrincipal(principal: X500Name): Party? = throw UnsupportedOperationException()
override fun partyFromKey(owningKey: PublicKey): Party? = throw UnsupportedOperationException()
@@ -66,6 +70,7 @@ object JacksonSupport {
addDeserializer(AnonymousParty::class.java, AnonymousPartyDeserializer)
addSerializer(Party::class.java, PartySerializer)
addDeserializer(Party::class.java, PartyDeserializer)
+ addDeserializer(AbstractParty::class.java, PartyDeserializer)
addSerializer(BigDecimal::class.java, ToStringSerializer)
addDeserializer(BigDecimal::class.java, NumberDeserializers.BigDecimalDeserializer())
addSerializer(SecureHash::class.java, SecureHashSerializer)
@@ -160,8 +165,20 @@ object JacksonSupport {
}
val mapper = parser.codec as PartyObjectMapper
- val principal = X500Name(parser.text)
- return mapper.partyFromPrincipal(principal) ?: throw JsonParseException(parser, "Could not find a Party with name ${principal}")
+ // TODO: We should probably have a better specified way of identifying X.500 names vs keys
+ // Base58 keys never include an equals character, while X.500 names always will, so we use that to determine
+ // how to parse the content
+ return if (parser.text.contains("=")) {
+ val principal = X500Name(parser.text)
+ mapper.partyFromPrincipal(principal) ?: throw JsonParseException(parser, "Could not find a Party with name ${principal}")
+ } else {
+ val key = try {
+ parsePublicKeyBase58(parser.text)
+ } catch (e: Exception) {
+ throw JsonParseException(parser, "Could not interpret ${parser.text} as a base58 encoded public key")
+ }
+ mapper.partyFromKey(key) ?: throw JsonParseException(parser, "Could not find a Party with key ${key.toStringShort()}")
+ }
}
}
diff --git a/client/jackson/src/main/kotlin/net/corda/jackson/StringToMethodCallParser.kt b/client/jackson/src/main/kotlin/net/corda/jackson/StringToMethodCallParser.kt
index 225bb9ec59..45454ef1d0 100644
--- a/client/jackson/src/main/kotlin/net/corda/jackson/StringToMethodCallParser.kt
+++ b/client/jackson/src/main/kotlin/net/corda/jackson/StringToMethodCallParser.kt
@@ -193,8 +193,7 @@ open class StringToMethodCallParser @JvmOverloads constructor(
val parameterString = "{ $args }"
val tree: JsonNode = om.readTree(parameterString) ?: throw UnparseableCallException(args)
if (tree.size() > parameters.size) throw UnparseableCallException.TooManyParameters(methodNameHint, args)
- val inOrderParams: List = parameters.mapIndexed { _, param ->
- val (argName, argType) = param
+ val inOrderParams: List = parameters.mapIndexed { _, (argName, argType) ->
val entry = tree[argName] ?: throw UnparseableCallException.MissingParameter(methodNameHint, argName, args)
try {
om.readValue(entry.traverse(om), argType)
diff --git a/client/jackson/src/test/kotlin/net/corda/jackson/StringToMethodCallParserTest.kt b/client/jackson/src/test/kotlin/net/corda/jackson/StringToMethodCallParserTest.kt
index 7c46e4919b..7610fbf91b 100644
--- a/client/jackson/src/test/kotlin/net/corda/jackson/StringToMethodCallParserTest.kt
+++ b/client/jackson/src/test/kotlin/net/corda/jackson/StringToMethodCallParserTest.kt
@@ -1,14 +1,16 @@
package net.corda.jackson
import net.corda.core.crypto.SecureHash
+import org.junit.Assert.assertArrayEquals
import org.junit.Test
+import kotlin.reflect.full.primaryConstructor
import kotlin.test.assertEquals
class StringToMethodCallParserTest {
@Suppress("UNUSED")
class Target {
fun simple() = "simple"
- fun string(note: String) = note
+ fun string(noteTextWord: String) = noteTextWord
fun twoStrings(a: String, b: String) = a + b
fun simpleObject(hash: SecureHash.SHA256) = hash.toString()
fun complexObject(pair: Pair) = pair
@@ -20,7 +22,7 @@ class StringToMethodCallParserTest {
val randomHash = "361170110f61086f77ff2c5b7ab36513705da1a3ebabf14dbe5cc9c982c45401"
val tests = mapOf(
"simple" to "simple",
- "string note: A test of barewords" to "A test of barewords",
+ "string noteTextWord: A test of barewords" to "A test of barewords",
"twoStrings a: Some words, b: ' and some words, like, Kirk, would, speak'" to "Some words and some words, like, Kirk, would, speak",
"simpleObject hash: $randomHash" to randomHash.toUpperCase(),
"complexObject pair: { first: 12, second: Word up brother }" to Pair(12, "Word up brother"),
@@ -36,4 +38,31 @@ class StringToMethodCallParserTest {
assertEquals(output, parser.parse(target, input).invoke())
}
}
+
+ @Suppress("UNUSED")
+ class ConstructorTarget(val someWord: String, val aDifferentThing: Int) {
+ constructor(alternativeWord: String) : this(alternativeWord, 0)
+ }
+
+ @Test
+ fun ctor1() {
+ val clazz = ConstructorTarget::class.java
+ val parser = StringToMethodCallParser(clazz)
+ val ctor = clazz.constructors.single { it.parameterCount == 2 }
+ val names: List = parser.paramNamesFromConstructor(ctor)
+ assertEquals(listOf("someWord", "aDifferentThing"), names)
+ val args: Array = parser.parseArguments(clazz.name, names.zip(ctor.parameterTypes), "someWord: Blah blah blah, aDifferentThing: 12")
+ assertArrayEquals(args, arrayOf("Blah blah blah", 12))
+ }
+
+ @Test
+ fun ctor2() {
+ val clazz = ConstructorTarget::class.java
+ val parser = StringToMethodCallParser(clazz)
+ val ctor = clazz.constructors.single { it.parameterCount == 1 }
+ val names: List = parser.paramNamesFromConstructor(ctor)
+ assertEquals(listOf("alternativeWord"), names)
+ val args: Array = parser.parseArguments(clazz.name, names.zip(ctor.parameterTypes), "alternativeWord: Foo bar!")
+ assertArrayEquals(args, arrayOf("Foo bar!"))
+ }
}
\ No newline at end of file
diff --git a/client/rpc/build.gradle b/client/rpc/build.gradle
index ec52494023..95f0ed0fcc 100644
--- a/client/rpc/build.gradle
+++ b/client/rpc/build.gradle
@@ -11,6 +11,9 @@ configurations {
integrationTestCompile.extendsFrom testCompile
integrationTestRuntime.extendsFrom testRuntime
+
+ smokeTestCompile.extendsFrom compile
+ smokeTestRuntime.extendsFrom runtime
}
sourceSets {
@@ -21,6 +24,24 @@ sourceSets {
srcDir file('src/integration-test/kotlin')
}
}
+ smokeTest {
+ kotlin {
+ // We must NOT have any Node code on the classpath, so do NOT
+ // include the test or integrationTest dependencies here.
+ compileClasspath += main.output
+ runtimeClasspath += main.output
+ srcDir file('src/smoke-test/kotlin')
+ }
+ }
+}
+
+processSmokeTestResources {
+ from(file("$rootDir/config/test/log4j2.xml")) {
+ rename 'log4j2\\.xml', 'log4j2-test.xml'
+ }
+ from(project(':node:capsule').tasks.buildCordaJAR) {
+ rename 'corda-(.*)', 'corda.jar'
+ }
}
// To find potential version conflicts, run "gradle htmlDependencyReport" and then look in
@@ -38,11 +59,22 @@ dependencies {
testCompile project(':test-utils')
testCompile project(':client:mock')
- // Integration test helpers
- integrationTestCompile "junit:junit:$junit_version"
+ // Smoke tests do NOT have any Node code on the classpath!
+ smokeTestCompile project(':finance')
+ smokeTestCompile "org.apache.logging.log4j:log4j-slf4j-impl:$log4j_version"
+ smokeTestCompile "org.apache.logging.log4j:log4j-core:$log4j_version"
+ smokeTestCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
+ smokeTestCompile "org.assertj:assertj-core:${assertj_version}"
+ smokeTestCompile "junit:junit:$junit_version"
}
task integrationTest(type: Test) {
testClassesDir = sourceSets.integrationTest.output.classesDir
classpath = sourceSets.integrationTest.runtimeClasspath
}
+
+task smokeTest(type: Test) {
+ testClassesDir = sourceSets.smokeTest.output.classesDir
+ classpath = sourceSets.smokeTest.runtimeClasspath
+ systemProperties['build.dir'] = buildDir
+}
diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt
index 5287a0de4f..3ecb3eda6e 100644
--- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt
+++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt
@@ -5,26 +5,181 @@ import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool
+import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
+import net.corda.client.rpc.internal.RPCClient
+import net.corda.client.rpc.internal.RPCClientConfiguration
+import net.corda.core.*
import net.corda.core.messaging.RPCOps
-import net.corda.core.millis
-import net.corda.core.random63BitValue
+import net.corda.node.driver.poll
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.RPCKryo
import net.corda.testing.*
import org.apache.activemq.artemis.api.core.SimpleString
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
import org.junit.Test
import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject
import java.time.Duration
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
class RPCStabilityTests {
+ object DummyOps : RPCOps {
+ override val protocolVersion = 0
+ }
+
+ private fun waitUntilNumberOfThreadsStable(executorService: ScheduledExecutorService): Int {
+ val values = ConcurrentLinkedQueue()
+ return poll(executorService, "number of threads to become stable", 250.millis) {
+ values.add(Thread.activeCount())
+ if (values.size > 5) {
+ values.poll()
+ }
+ val first = values.peek()
+ if (values.size == 5 && values.all { it == first }) {
+ first
+ } else {
+ null
+ }
+ }.get()
+ }
+
+ @Test
+ fun `client and server dont leak threads`() {
+ val executor = Executors.newScheduledThreadPool(1)
+ fun startAndStop() {
+ rpcDriver {
+ val server = startRpcServer(ops = DummyOps)
+ startRpcClient(server.get().broker.hostAndPort!!).get()
+ }
+ }
+ repeat(5) {
+ startAndStop()
+ }
+ val numberOfThreadsBefore = waitUntilNumberOfThreadsStable(executor)
+ repeat(5) {
+ startAndStop()
+ }
+ val numberOfThreadsAfter = waitUntilNumberOfThreadsStable(executor)
+ // This is a less than check because threads from other tests may be shutting down while this test is running.
+ // This is therefore a "best effort" check. When this test is run on its own this should be a strict equality.
+ assertTrue(numberOfThreadsBefore >= numberOfThreadsAfter)
+ executor.shutdownNow()
+ }
+
+ @Test
+ fun `client doesnt leak threads when it fails to start`() {
+ val executor = Executors.newScheduledThreadPool(1)
+ fun startAndStop() {
+ rpcDriver {
+ ErrorOr.catch { startRpcClient(HostAndPort.fromString("localhost:9999")).get() }
+ val server = startRpcServer(ops = DummyOps)
+ ErrorOr.catch { startRpcClient(
+ server.get().broker.hostAndPort!!,
+ configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1)
+ ).get() }
+ }
+ }
+ repeat(5) {
+ startAndStop()
+ }
+ val numberOfThreadsBefore = waitUntilNumberOfThreadsStable(executor)
+ repeat(5) {
+ startAndStop()
+ }
+ val numberOfThreadsAfter = waitUntilNumberOfThreadsStable(executor)
+ assertTrue(numberOfThreadsBefore >= numberOfThreadsAfter)
+ executor.shutdownNow()
+ }
+
+ fun RpcBrokerHandle.getStats(): Map {
+ return serverControl.run {
+ mapOf(
+ "connections" to listConnectionIDs().toSet(),
+ "sessionCount" to listConnectionIDs().flatMap { listSessions(it).toList() }.size,
+ "consumerCount" to totalConsumerCount
+ )
+ }
+ }
+
+ @Test
+ fun `rpc server close doesnt leak broker resources`() {
+ rpcDriver {
+ fun startAndCloseServer(broker: RpcBrokerHandle) {
+ startRpcServerWithBrokerRunning(
+ configuration = RPCServerConfiguration.default.copy(consumerPoolSize = 1, producerPoolBound = 1),
+ ops = DummyOps,
+ brokerHandle = broker
+ ).rpcServer.close()
+ }
+
+ val broker = startRpcBroker().get()
+ startAndCloseServer(broker)
+ val initial = broker.getStats()
+ repeat(100) {
+ startAndCloseServer(broker)
+ }
+ pollUntilTrue("broker resources to be released") {
+ initial == broker.getStats()
+ }
+ }
+ }
+
+ @Test
+ fun `rpc client close doesnt leak broker resources`() {
+ rpcDriver {
+ val server = startRpcServer(configuration = RPCServerConfiguration.default.copy(consumerPoolSize = 1, producerPoolBound = 1), ops = DummyOps).get()
+ RPCClient(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password).close()
+ val initial = server.broker.getStats()
+ repeat(100) {
+ val connection = RPCClient(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password)
+ connection.close()
+ }
+ pollUntilTrue("broker resources to be released") {
+ initial == server.broker.getStats()
+ }
+ }
+ }
+
+ @Test
+ fun `rpc server close is idempotent`() {
+ rpcDriver {
+ val server = startRpcServer(ops = DummyOps).get()
+ repeat(10) {
+ server.rpcServer.close()
+ }
+ }
+ }
+
+ @Test
+ fun `rpc client close is idempotent`() {
+ rpcDriver {
+ val serverShutdown = shutdownManager.follower()
+ val server = startRpcServer(ops = DummyOps).get()
+ serverShutdown.unfollow()
+ // With the server up
+ val connection1 = RPCClient(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password)
+ repeat(10) {
+ connection1.close()
+ }
+ val connection2 = RPCClient(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password)
+ serverShutdown.shutdown()
+ // With the server down
+ repeat(10) {
+ connection2.close()
+ }
+ }
+ }
+
interface LeakObservableOps: RPCOps {
fun leakObservable(): Observable
}
@@ -42,7 +197,7 @@ class RPCStabilityTests {
}
}
val server = startRpcServer(ops = leakObservableOpsImpl)
- val proxy = startRpcClient(server.get().hostAndPort).get()
+ val proxy = startRpcClient(server.get().broker.hostAndPort!!).get()
// Leak many observables
val N = 200
(1..N).toList().parallelStream().forEach {
@@ -57,6 +212,31 @@ class RPCStabilityTests {
}
}
+ interface ReconnectOps : RPCOps {
+ fun ping(): String
+ }
+
+ @Test
+ fun `client reconnects to rebooted server`() {
+ rpcDriver {
+ val ops = object : ReconnectOps {
+ override val protocolVersion = 0
+ override fun ping() = "pong"
+ }
+ val serverFollower = shutdownManager.follower()
+ val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!!
+ serverFollower.unfollow()
+ val clientFollower = shutdownManager.follower()
+ val client = startRpcClient(serverPort).getOrThrow()
+ clientFollower.unfollow()
+ assertEquals("pong", client.ping())
+ serverFollower.shutdown()
+ startRpcServer(ops = ops, customPort = serverPort).getOrThrow()
+ assertEquals("pong", client.ping())
+ clientFollower.shutdown() // Driver would do this after the new server, causing hang.
+ }
+ }
+
interface TrackSubscriberOps : RPCOps {
fun subscribe(): Observable
}
@@ -86,7 +266,7 @@ class RPCStabilityTests {
val numberOfClients = 4
val clients = Futures.allAsList((1 .. numberOfClients).map {
- startRandomRpcClient(server.hostAndPort)
+ startRandomRpcClient(server.broker.hostAndPort!!)
}).get()
// Poll until all clients connect
@@ -131,7 +311,7 @@ class RPCStabilityTests {
// Construct an RPC session manually so that we can hang in the message handler
val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}"
- val session = startArtemisSession(server.hostAndPort)
+ val session = startArtemisSession(server.broker.hostAndPort!!)
session.createTemporaryQueue(myQueue, myQueue)
val consumer = session.createConsumer(myQueue, null, -1, -1, false)
consumer.setMessageHandler {
@@ -163,7 +343,7 @@ class RPCStabilityTests {
fun RPCDriverExposedDSLInterface.pollUntilClientNumber(server: RpcServerHandle, expected: Int) {
pollUntilTrue("number of RPC clients to become $expected") {
- val clientAddresses = server.serverControl.addressNames.filter { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) }
+ val clientAddresses = server.broker.serverControl.addressNames.filter { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) }
clientAddresses.size == expected
}.get()
}
\ No newline at end of file
diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt
index a6c97c3e3a..199cdd6d67 100644
--- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt
+++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt
@@ -9,10 +9,12 @@ import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.config.SSLConfiguration
import java.time.Duration
+/** @see RPCClient.RPCConnection */
class CordaRPCConnection internal constructor(
connection: RPCClient.RPCConnection
) : RPCClient.RPCConnection by connection
+/** @see RPCClientConfiguration */
data class CordaRPCClientConfiguration(
val connectionMaxRetryInterval: Duration
) {
@@ -29,6 +31,7 @@ data class CordaRPCClientConfiguration(
}
}
+/** @see RPCClient */
class CordaRPCClient(
hostAndPort: HostAndPort,
sslConfiguration: SSLConfiguration? = null,
diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt
index 60d50928bd..3e52dbd946 100644
--- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt
+++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt
@@ -53,10 +53,12 @@ data class RPCClientConfiguration(
val connectionRetryIntervalMultiplier: Double,
/** Maximum retry interval */
val connectionMaxRetryInterval: Duration,
+ val maxReconnectAttempts: Int,
/** Maximum file size */
val maxFileSize: Int
) {
companion object {
+ val unlimitedReconnectAttempts = -1
@JvmStatic
val default = RPCClientConfiguration(
minimumServerProtocolVersion = 0,
@@ -68,6 +70,7 @@ data class RPCClientConfiguration(
connectionRetryInterval = 5.seconds,
connectionRetryIntervalMultiplier = 1.5,
connectionMaxRetryInterval = 3.minutes,
+ maxReconnectAttempts = unlimitedReconnectAttempts,
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
maxFileSize = 10485760
)
@@ -114,9 +117,9 @@ class RPCClient(
*
* The [RPCOps] defines what client RPCs are available. If an RPC returns an [Observable] anywhere in the object
* graph returned then the server-side observable is transparently forwarded to the client side here.
- * *You are expected to use it*. The server will begin buffering messages immediately that it will expect you to
- * drain by subscribing to the returned observer. You can opt-out of this by simply calling the
- * [net.corda.client.rpc.notUsed] method on it. You don't have to explicitly close the observable if you actually
+ * *You are expected to use it*. The server will begin sending messages immediately that will be buffered on the
+ * client, you are expected to drain by subscribing to the returned observer. You can opt-out of this by simply
+ * calling the [net.corda.client.rpc.notUsed] method on it. You don't have to explicitly close the observable if you actually
* subscribe to it: it will close itself and free up the server-side resources either when the client or JVM itself
* is shutdown, or when there are no more subscribers to it. Once all the subscribers to a returned observable are
* unsubscribed or the observable completes successfully or with an error, the observable is closed and you can't
@@ -139,30 +142,37 @@ class RPCClient(
retryInterval = rpcConfiguration.connectionRetryInterval.toMillis()
retryIntervalMultiplier = rpcConfiguration.connectionRetryIntervalMultiplier
maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval.toMillis()
+ reconnectAttempts = rpcConfiguration.maxReconnectAttempts
minLargeMessageSize = rpcConfiguration.maxFileSize
}
val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator, clientAddress, rpcOpsClass)
- proxyHandler.start()
+ try {
+ proxyHandler.start()
- @Suppress("UNCHECKED_CAST")
- val ops = Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler) as I
+ @Suppress("UNCHECKED_CAST")
+ val ops = Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler) as I
- val serverProtocolVersion = ops.protocolVersion
- if (serverProtocolVersion < rpcConfiguration.minimumServerProtocolVersion) {
- throw RPCException("Requested minimum protocol version (${rpcConfiguration.minimumServerProtocolVersion}) is higher" +
- " than the server's supported protocol version ($serverProtocolVersion)")
- }
- proxyHandler.setServerProtocolVersion(serverProtocolVersion)
-
- log.debug("RPC connected, returning proxy")
- object : RPCConnection {
- override val proxy = ops
- override val serverProtocolVersion = serverProtocolVersion
- override fun close() {
- proxyHandler.close()
- serverLocator.close()
+ val serverProtocolVersion = ops.protocolVersion
+ if (serverProtocolVersion < rpcConfiguration.minimumServerProtocolVersion) {
+ throw RPCException("Requested minimum protocol version (${rpcConfiguration.minimumServerProtocolVersion}) is higher" +
+ " than the server's supported protocol version ($serverProtocolVersion)")
}
+ proxyHandler.setServerProtocolVersion(serverProtocolVersion)
+
+ log.debug("RPC connected, returning proxy")
+ object : RPCConnection {
+ override val proxy = ops
+ override val serverProtocolVersion = serverProtocolVersion
+ override fun close() {
+ proxyHandler.close()
+ serverLocator.close()
+ }
+ }
+ } catch (exception: Throwable) {
+ proxyHandler.close()
+ serverLocator.close()
+ throw exception
}
}
}
diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt
index da95f01b4d..2fde6fcaaf 100644
--- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt
+++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt
@@ -25,16 +25,11 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator
import rx.Notification
import rx.Observable
import rx.subjects.UnicastSubject
-import sun.reflect.CallerSensitive
import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method
import java.util.*
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.Executors
-import java.util.concurrent.ScheduledFuture
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
-import kotlin.collections.ArrayList
import kotlin.reflect.jvm.javaMethod
/**
@@ -81,16 +76,13 @@ class RPCClientProxyHandler(
val log = loggerFor()
// Note that this KryoPool is not yet capable of deserialising Observables, it requires Proxy-specific context
// to do that. However it may still be used for serialisation of RPC requests and related messages.
- val kryoPool = KryoPool.Builder { RPCKryo(RpcClientObservableSerializer) }.build()
+ val kryoPool: KryoPool = KryoPool.Builder { RPCKryo(RpcClientObservableSerializer) }.build()
// To check whether toString() is being invoked
val toStringMethod: Method = Object::toString.javaMethod!!
}
// Used for reaping
- private val reaperExecutor = Executors.newScheduledThreadPool(
- 1,
- ThreadFactoryBuilder().setNameFormat("rpc-client-reaper-%d").build()
- )
+ private var reaperExecutor: ScheduledExecutorService? = null
// A sticky pool for running Observable.onNext()s. We need the stickiness to preserve the observation ordering.
private val observationExecutorThreadFactory = ThreadFactoryBuilder().setNameFormat("rpc-client-observation-pool-%d").build()
@@ -109,7 +101,7 @@ class RPCClientProxyHandler(
hardReferenceStore = Collections.synchronizedSet(mutableSetOf>())
)
// Holds a reference to the scheduled reaper.
- private lateinit var reaperScheduledFuture: ScheduledFuture<*>
+ private var reaperScheduledFuture: ScheduledFuture<*>? = null
// The protocol version of the server, to be initialised to the value of [RPCOps.protocolVersion]
private var serverProtocolVersion: Int? = null
@@ -145,7 +137,7 @@ class RPCClientProxyHandler(
// TODO We may need to pool these somehow anyway, otherwise if the server sends many big messages in parallel a
// single consumer may be starved for flow control credits. Recheck this once Artemis's large message streaming is
// integrated properly.
- private lateinit var sessionAndConsumer: ArtemisConsumer
+ private var sessionAndConsumer: ArtemisConsumer? = null
// Pool producers to reduce contention on the client side.
private val sessionAndProducerPool = LazyPool(bound = rpcConfiguration.producerPoolBound) {
// Note how we create new sessions *and* session factories per producer.
@@ -162,7 +154,12 @@ class RPCClientProxyHandler(
* Start the client. This creates the per-client queue, starts the consumer session and the reaper.
*/
fun start() {
- reaperScheduledFuture = reaperExecutor.scheduleAtFixedRate(
+ lifeCycle.requireState(State.UNSTARTED)
+ reaperExecutor = Executors.newScheduledThreadPool(
+ 1,
+ ThreadFactoryBuilder().setNameFormat("rpc-client-reaper-%d").build()
+ )
+ reaperScheduledFuture = reaperExecutor!!.scheduleAtFixedRate(
this::reapObservables,
rpcConfiguration.reapInterval.toMillis(),
rpcConfiguration.reapInterval.toMillis(),
@@ -187,7 +184,7 @@ class RPCClientProxyHandler(
if (method == toStringMethod) {
return "Client RPC proxy for $rpcOpsClass"
}
- if (sessionAndConsumer.session.isClosed) {
+ if (sessionAndConsumer!!.session.isClosed) {
throw RPCException("RPC Proxy is closed")
}
val rpcId = RPCApi.RpcRequestId(random63BitValue())
@@ -211,6 +208,12 @@ class RPCClientProxyHandler(
it.session.commit()
}
return replyFuture.getOrThrow()
+ } catch (e: RuntimeException) {
+ // Already an unchecked exception, so just rethrow it
+ throw e
+ } catch (e: Exception) {
+ // This must be a checked exception, so wrap it
+ throw RPCException(e.message ?: "", e)
} finally {
callSiteMap?.remove(rpcId.toLong)
}
@@ -268,24 +271,19 @@ class RPCClientProxyHandler(
* Closes the RPC proxy. Reaps all observables, shuts down the reaper, closes all sessions and executors.
*/
fun close() {
- sessionAndConsumer.consumer.close()
- sessionAndConsumer.session.close()
- sessionAndConsumer.sessionFactory.close()
- reaperScheduledFuture.cancel(false)
+ sessionAndConsumer?.sessionFactory?.close()
+ reaperScheduledFuture?.cancel(false)
observableContext.observableMap.invalidateAll()
reapObservables()
- reaperExecutor.shutdownNow()
+ reaperExecutor?.shutdownNow()
sessionAndProducerPool.close().forEach {
- it.producer.close()
- it.session.close()
it.sessionFactory.close()
}
// Note the ordering is important, we shut down the consumer *before* the observation executor, otherwise we may
// leak borrowed executors.
val observationExecutors = observationExecutorPool.close()
observationExecutors.forEach { it.shutdownNow() }
- observationExecutors.forEach { it.awaitTermination(100, TimeUnit.MILLISECONDS) }
- lifeCycle.transition(State.STARTED, State.FINISHED)
+ lifeCycle.justTransition(State.FINISHED)
}
/**
diff --git a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/NodeConfig.kt b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/NodeConfig.kt
new file mode 100644
index 0000000000..75c4074be1
--- /dev/null
+++ b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/NodeConfig.kt
@@ -0,0 +1,48 @@
+package net.corda.kotlin.rpc
+
+import com.typesafe.config.*
+import net.corda.core.crypto.commonName
+import net.corda.core.identity.Party
+import net.corda.nodeapi.User
+
+class NodeConfig(
+ val party: Party,
+ val p2pPort: Int,
+ val rpcPort: Int,
+ val webPort: Int,
+ val extraServices: List,
+ val users: List,
+ var networkMap: NodeConfig? = null
+) {
+ companion object {
+ val renderOptions: ConfigRenderOptions = ConfigRenderOptions.defaults().setOriginComments(false)
+ }
+
+ val commonName: String = party.name.commonName
+
+ /*
+ * The configuration object depends upon the networkMap,
+ * which is mutable.
+ */
+ fun toFileConfig(): Config = ConfigFactory.empty()
+ .withValue("myLegalName", valueFor(party.name.toString()))
+ .withValue("p2pAddress", addressValueFor(p2pPort))
+ .withValue("extraAdvertisedServiceIds", valueFor(extraServices))
+ .withFallback(optional("networkMapService", networkMap, { c, n ->
+ c.withValue("address", addressValueFor(n.p2pPort))
+ .withValue("legalName", valueFor(n.party.name.toString()))
+ }))
+ .withValue("webAddress", addressValueFor(webPort))
+ .withValue("rpcAddress", addressValueFor(rpcPort))
+ .withValue("rpcUsers", valueFor(users.map(User::toMap).toList()))
+ .withValue("useTestClock", valueFor(true))
+
+ fun toText(): String = toFileConfig().root().render(renderOptions)
+
+ private fun valueFor(any: T): ConfigValue? = ConfigValueFactory.fromAnyRef(any)
+ private fun addressValueFor(port: Int) = valueFor("localhost:$port")
+ private inline fun optional(path: String, obj: T?, body: (Config, T) -> Config): Config {
+ val config = ConfigFactory.empty()
+ return if (obj == null) config else body(config, obj).atPath(path)
+ }
+}
diff --git a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/NodeProcess.kt b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/NodeProcess.kt
new file mode 100644
index 0000000000..3d81f9ad83
--- /dev/null
+++ b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/NodeProcess.kt
@@ -0,0 +1,107 @@
+package net.corda.kotlin.rpc
+
+import com.google.common.net.HostAndPort
+import net.corda.client.rpc.CordaRPCClient
+import net.corda.client.rpc.CordaRPCConnection
+import net.corda.core.utilities.loggerFor
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.Path
+import java.nio.file.Paths
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit.SECONDS
+import kotlin.test.*
+
+class NodeProcess(
+ val config: NodeConfig,
+ val nodeDir: Path,
+ private val node: Process,
+ private val client: CordaRPCClient
+) : AutoCloseable {
+ private companion object {
+ val log = loggerFor()
+ val javaPath: Path = Paths.get(System.getProperty("java.home"), "bin", "java")
+ val corda = File(this::class.java.getResource("/corda.jar").toURI())
+ val buildDir: Path = Paths.get(System.getProperty("build.dir"))
+ val capsuleDir: Path = buildDir.resolve("capsule")
+ }
+
+ fun connect(): CordaRPCConnection {
+ val user = config.users[0]
+ return client.start(user.username, user.password)
+ }
+
+ override fun close() {
+ log.info("Stopping node '${config.commonName}'")
+ node.destroy()
+ if (!node.waitFor(60, SECONDS)) {
+ log.warn("Node '${config.commonName}' has not shutdown correctly")
+ node.destroyForcibly()
+ }
+
+ log.info("Deleting Artemis directories, because they're large!")
+ nodeDir.resolve("artemis").toFile().deleteRecursively()
+ }
+
+ class Factory(val nodesDir: Path) {
+ init {
+ assertTrue(nodesDir.toFile().forceDirectory(), "Directory '$nodesDir' does not exist")
+ }
+
+ fun create(config: NodeConfig): NodeProcess {
+ val nodeDir = Files.createTempDirectory(nodesDir, config.commonName)
+ log.info("Node directory: {}", nodeDir)
+
+ val confFile = nodeDir.resolve("node.conf").toFile()
+ confFile.writeText(config.toText())
+
+ val process = startNode(nodeDir)
+ val client = CordaRPCClient(HostAndPort.fromParts("localhost", config.rpcPort))
+ val user = config.users[0]
+
+ val setupExecutor = Executors.newSingleThreadScheduledExecutor()
+ try {
+ setupExecutor.scheduleWithFixedDelay({
+ try {
+ if (!process.isAlive) {
+ log.error("Node '${config.commonName}' has died.")
+ return@scheduleWithFixedDelay
+ }
+ val conn = client.start(user.username, user.password)
+ conn.close()
+
+ // Cancel the "setup" task now that we've created the RPC client.
+ setupExecutor.shutdown()
+ } catch (e: Exception) {
+ log.warn("Node '{}' not ready yet (Error: {})", config.commonName, e.message)
+ }
+ }, 5, 1, SECONDS)
+
+ val setupOK = setupExecutor.awaitTermination(120, SECONDS)
+ assertTrue(setupOK && process.isAlive, "Failed to create RPC connection")
+ } catch (e: Exception) {
+ process.destroyForcibly()
+ throw e
+ } finally {
+ setupExecutor.shutdownNow()
+ }
+
+ return NodeProcess(config, nodeDir, process, client)
+ }
+
+ private fun startNode(nodeDir: Path): Process {
+ val builder = ProcessBuilder()
+ .command(javaPath.toString(), "-jar", corda.path)
+ .directory(nodeDir.toFile())
+
+ builder.environment().putAll(mapOf(
+ "CAPSULE_CACHE_DIR" to capsuleDir.toString()
+ ))
+
+ return builder.start()
+ }
+ }
+}
+
+private fun File.forceDirectory(): Boolean = this.isDirectory || this.mkdirs()
+
diff --git a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt
new file mode 100644
index 0000000000..f7863c4029
--- /dev/null
+++ b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt
@@ -0,0 +1,158 @@
+package net.corda.kotlin.rpc
+
+import java.io.FilterInputStream
+import java.io.InputStream
+import java.nio.file.Path
+import java.nio.file.Paths
+import java.time.Duration.ofSeconds
+import java.util.Currency
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.test.*
+import net.corda.client.rpc.CordaRPCConnection
+import net.corda.client.rpc.notUsed
+import net.corda.core.contracts.*
+import net.corda.core.getOrThrow
+import net.corda.core.identity.Party
+import net.corda.core.messaging.CordaRPCOps
+import net.corda.core.messaging.StateMachineUpdate
+import net.corda.core.messaging.startFlow
+import net.corda.core.messaging.startTrackedFlow
+import net.corda.core.serialization.OpaqueBytes
+import net.corda.core.sizedInputStreamAndHash
+import net.corda.core.utilities.DUMMY_NOTARY
+import net.corda.core.utilities.loggerFor
+import net.corda.flows.CashIssueFlow
+import net.corda.nodeapi.User
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+
+class StandaloneCordaRPClientTest {
+ private companion object {
+ val log = loggerFor()
+ val buildDir: Path = Paths.get(System.getProperty("build.dir"))
+ val nodesDir: Path = buildDir.resolve("nodes")
+ val user = User("user1", "test", permissions = setOf("ALL"))
+ val factory = NodeProcess.Factory(nodesDir)
+ val port = AtomicInteger(15000)
+ const val attachmentSize = 2116
+ const val timeout = 60L
+ }
+
+ private lateinit var notary: NodeProcess
+ private lateinit var rpcProxy: CordaRPCOps
+ private lateinit var connection: CordaRPCConnection
+ private lateinit var notaryIdentity: Party
+
+ private val notaryConfig = NodeConfig(
+ party = DUMMY_NOTARY,
+ p2pPort = port.andIncrement,
+ rpcPort = port.andIncrement,
+ webPort = port.andIncrement,
+ extraServices = listOf("corda.notary.validating"),
+ users = listOf(user)
+ )
+
+ @Before
+ fun setUp() {
+ notary = factory.create(notaryConfig)
+ connection = notary.connect()
+ rpcProxy = connection.proxy
+ notaryIdentity = fetchNotaryIdentity()
+ }
+
+ @After
+ fun done() {
+ try {
+ connection.close()
+ } finally {
+ notary.close()
+ }
+ }
+
+ @Test
+ fun `test attachment upload`() {
+ val attachment = sizedInputStreamAndHash(attachmentSize)
+ assertFalse(rpcProxy.attachmentExists(attachment.sha256))
+ val id = WrapperStream(attachment.inputStream).use { rpcProxy.uploadAttachment(it) }
+ assertEquals(id, attachment.sha256, "Attachment has incorrect SHA256 hash")
+ }
+
+ @Test
+ fun `test starting flow`() {
+ rpcProxy.startFlow(::CashIssueFlow, 127.POUNDS, OpaqueBytes.of(0), notaryIdentity, notaryIdentity)
+ .returnValue.getOrThrow(ofSeconds(timeout))
+ }
+
+ @Test
+ fun `test starting tracked flow`() {
+ var trackCount = 0
+ val handle = rpcProxy.startTrackedFlow(
+ ::CashIssueFlow, 429.DOLLARS, OpaqueBytes.of(0), notaryIdentity, notaryIdentity
+ )
+ handle.progress.subscribe { msg ->
+ log.info("Flow>> $msg")
+ ++trackCount
+ }
+ handle.returnValue.getOrThrow(ofSeconds(timeout))
+ assertNotEquals(0, trackCount)
+ }
+
+ @Test
+ fun `test network map`() {
+ assertEquals(DUMMY_NOTARY.name, notaryIdentity.name)
+ }
+
+ @Test
+ fun `test state machines`() {
+ val (stateMachines, updates) = rpcProxy.stateMachinesAndUpdates()
+ assertEquals(0, stateMachines.size)
+
+ var updateCount = 0
+ updates.subscribe { update ->
+ if (update is StateMachineUpdate.Added) {
+ log.info("StateMachine>> Id=${update.id}")
+ ++updateCount
+ }
+ }
+
+ // Now issue some cash
+ rpcProxy.startFlow(::CashIssueFlow, 513.SWISS_FRANCS, OpaqueBytes.of(0), notaryIdentity, notaryIdentity)
+ .returnValue.getOrThrow(ofSeconds(timeout))
+ assertEquals(1, updateCount)
+ }
+
+ @Test
+ fun `test vault`() {
+ val (vault, vaultUpdates) = rpcProxy.vaultAndUpdates()
+ assertEquals(0, vault.size)
+
+ var updateCount = 0
+ vaultUpdates.subscribe { update ->
+ log.info("Vault>> FlowId=${update.flowId}")
+ ++updateCount
+ }
+
+ // Now issue some cash
+ rpcProxy.startFlow(::CashIssueFlow, 629.POUNDS, OpaqueBytes.of(0), notaryIdentity, notaryIdentity)
+ .returnValue.getOrThrow(ofSeconds(timeout))
+ assertNotEquals(0, updateCount)
+
+ // Check that this cash exists in the vault
+ val cashBalance = rpcProxy.getCashBalances()
+ log.info("Cash Balances: $cashBalance")
+ assertEquals(1, cashBalance.size)
+ assertEquals(629.POUNDS, cashBalance[Currency.getInstance("GBP")])
+ }
+
+
+ private fun fetchNotaryIdentity(): Party {
+ val (nodeInfo, nodeUpdates) = rpcProxy.networkMapUpdates()
+ nodeUpdates.notUsed()
+ assertEquals(1, nodeInfo.size)
+ return nodeInfo[0].legalIdentity
+ }
+
+ // This InputStream cannot have been whitelisted.
+ private class WrapperStream(input: InputStream) : FilterInputStream(input)
+}
diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt
index 6139ad79fb..20026ab7c1 100644
--- a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt
+++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt
@@ -47,8 +47,8 @@ open class AbstractRPCTest {
}.get()
RPCTestMode.Netty ->
startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { server ->
- startRpcClient(server.hostAndPort, rpcUser.username, rpcUser.password, clientConfiguration).map {
- TestProxy(it, { startArtemisSession(server.hostAndPort, rpcUser.username, rpcUser.password) })
+ startRpcClient(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map {
+ TestProxy(it, { startArtemisSession(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password) })
}
}.get()
}
diff --git a/constants.properties b/constants.properties
index 6312235f73..74d2be1352 100644
--- a/constants.properties
+++ b/constants.properties
@@ -1,4 +1,4 @@
-gradlePluginsVersion=0.12.0
+gradlePluginsVersion=0.12.1
kotlinVersion=1.1.2
guavaVersion=21.0
bouncycastleVersion=1.56
diff --git a/cordform-common/build.gradle b/cordform-common/build.gradle
new file mode 100644
index 0000000000..c3c1676b23
--- /dev/null
+++ b/cordform-common/build.gradle
@@ -0,0 +1,15 @@
+apply plugin: 'java'
+apply plugin: 'maven-publish'
+apply plugin: 'net.corda.plugins.publish-utils'
+
+repositories {
+ mavenCentral()
+}
+
+dependencies {
+ // TypeSafe Config: for simple and human friendly config files.
+ compile "com.typesafe:config:$typesafe_config_version"
+
+ // Bouncy Castle: for X.500 distinguished name manipulation
+ compile "org.bouncycastle:bcprov-jdk15on:$bouncycastle_version"
+}
diff --git a/cordform-common/src/main/java/net/corda/cordform/CordformContext.java b/cordform-common/src/main/java/net/corda/cordform/CordformContext.java
new file mode 100644
index 0000000000..c127392f5c
--- /dev/null
+++ b/cordform-common/src/main/java/net/corda/cordform/CordformContext.java
@@ -0,0 +1,8 @@
+package net.corda.cordform;
+
+import org.bouncycastle.asn1.x500.X500Name;
+import java.nio.file.Path;
+
+public interface CordformContext {
+ Path baseDirectory(X500Name nodeName);
+}
diff --git a/cordform-common/src/main/java/net/corda/cordform/CordformDefinition.java b/cordform-common/src/main/java/net/corda/cordform/CordformDefinition.java
new file mode 100644
index 0000000000..85a171f8fa
--- /dev/null
+++ b/cordform-common/src/main/java/net/corda/cordform/CordformDefinition.java
@@ -0,0 +1,27 @@
+package net.corda.cordform;
+
+import org.bouncycastle.asn1.x500.X500Name;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.function.Consumer;
+
+public abstract class CordformDefinition {
+ public final Path driverDirectory;
+ public final ArrayList> nodeConfigurers = new ArrayList<>();
+ public final X500Name networkMapNodeName;
+
+ public CordformDefinition(Path driverDirectory, X500Name networkMapNodeName) {
+ this.driverDirectory = driverDirectory;
+ this.networkMapNodeName = networkMapNodeName;
+ }
+
+ public void addNode(Consumer super CordformNode> configurer) {
+ nodeConfigurers.add(configurer);
+ }
+
+ /**
+ * Make arbitrary changes to the node directories before they are started.
+ * @param context Lookup of node directory by node name.
+ */
+ public abstract void setup(CordformContext context);
+}
diff --git a/cordform-common/src/main/java/net/corda/cordform/CordformNode.java b/cordform-common/src/main/java/net/corda/cordform/CordformNode.java
new file mode 100644
index 0000000000..d8160d70cc
--- /dev/null
+++ b/cordform-common/src/main/java/net/corda/cordform/CordformNode.java
@@ -0,0 +1,92 @@
+package net.corda.cordform;
+
+import static java.util.Collections.emptyList;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.util.List;
+import java.util.Map;
+
+public class CordformNode {
+ protected static final String DEFAULT_HOST = "localhost";
+
+ /**
+ * Name of the node.
+ */
+ private String name;
+
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * A list of advertised services ID strings.
+ */
+ public List advertisedServices = emptyList();
+
+ /**
+ * If running a distributed notary, a list of node addresses for joining the Raft cluster
+ */
+ public List notaryClusterAddresses = emptyList();
+ /**
+ * Set the RPC users for this node. This configuration block allows arbitrary configuration.
+ * The recommended current structure is:
+ * [[['username': "username_here", 'password': "password_here", 'permissions': ["permissions_here"]]]
+ * The above is a list to a map of keys to values using Groovy map and list shorthands.
+ *
+ * Incorrect configurations will not cause a DSL error.
+ */
+ public List