Moved RPC Proxy to own module in testing/qa/behave/tools

Incorporating PR review feedback.

Minor fixes to default path and additional copying of startup script.
This commit is contained in:
josecoll 2018-02-16 19:23:47 +00:00
parent bb4dab7451
commit 764af04608
15 changed files with 1166 additions and 0 deletions

2
.idea/compiler.xml generated
View File

@ -163,6 +163,8 @@
<module name="testing-test-common_test" target="1.8" />
<module name="testing-test-utils_main" target="1.8" />
<module name="testing-test-utils_test" target="1.8" />
<module name="testing_main" target="1.8" />
<module name="testing_test" target="1.8" />
<module name="tools_main" target="1.8" />
<module name="tools_test" target="1.8" />
<module name="trader-demo_integrationTest" target="1.8" />

View File

@ -43,6 +43,7 @@ include 'perftestcordapp'
['test-common', 'test-utils', 'smoke-test-utils', 'node-driver'].each {
project(":$it").projectDir = new File("$settingsDir/testing/$it")
}
include 'testing:qa:behave:tools:rpc-proxy'
include 'network-management'
include 'network-management:capsule'
include 'network-management:capsule-hsm'

View File

@ -0,0 +1,39 @@
#!/bin/bash
# Please ensure you run this script using source code (eg. GitHub master, branch or TAG) that reflects the version label defined below
# For example:
# corda-master => git clone https://github.com/corda/corda
# r3corda-master => git clone https://github.com/corda/enterprise
# Please run this script from the corda source code directory
# eg. $ pwd
# /myprojects/corda
# $ testing/qa/behave/scripts/update-corda-cts.sh
VERSION=master
BUILD_DIR=`pwd`
STAGING_DIR=~/staging/corda/corda-${VERSION}
# Set up directories
mkdir -p ${STAGING_DIR}/apps
cd ${BUILD_DIR}
echo "*************************************************************"
echo "Building and installing $VERSION from $BUILD_DIR"
echo " to $STAGING_DIR"
echo "*************************************************************"
# Copy Corda capsule into deps
./gradlew clean install
cp -v $(ls node/capsule/build/libs/corda-*.jar | tail -n1) ${STAGING_DIR}/corda.jar
cp -v $(ls finance/build/libs/corda-finance-*.jar | tail -n1) ${STAGING_DIR}/apps
# Build Network Bootstrapper
./gradlew buildBootstrapperJar
cp -v $(ls tools/bootstrapper/build/libs/*.jar | tail -n1) ${STAGING_DIR}/network-bootstrapper.jar
# Build rpcProxy (required by CTS Scenario Driver to call Corda 3.0 which continues to use Kryo for RPC)
./gradlew testing:qa:behave:tools:rpc-proxy:rpcProxyJar
cp -v $(ls testing/qa/behave/tools/rpc-proxy/build/libs/corda-rpcProxy*.jar | tail -n1) ${STAGING_DIR}/corda-rpcProxy.jar
cp -v testing/qa/behave/tools/rpc-proxy/startRPCproxy.sh ${STAGING_DIR}

View File

@ -0,0 +1,35 @@
#!/bin/bash
# Please ensure you run this script using source code (eg. GitHub master, branch or TAG) that reflects the version label defined below
# For example:
# corda-master => git clone https://github.com/corda/corda
# r3corda-master => git clone https://github.com/corda/enterprise
# Please run this script from the corda source code directory
# eg. $ pwd
# /myprojects/corda
# $ testing/qa/behave/scripts/update-cordapps-cts.sh
VERSION=master
BUILD_DIR=`pwd`
SAMPLES_ROOT=`pwd`/../samples
STAGING_DIR=~/staging/corda/corda-${VERSION}
# Set up directories
mkdir -p ${STAGING_DIR}/apps
mkdir -p ${STAGING_DIR}/proxy
# Corda repository pinned cordapps
cd ${BUILD_DIR}
./gradlew samples:simm-valuation-demo:jar
cp -v $(ls samples/simm-valuation-demo/build/libs/simm-valuation-demo-*.jar | tail -n1) ${STAGING_DIR}/apps
# Independent cordapps
cd ${SAMPLES_ROOT}
# Options sample
cd cordapp-option
./gradlew clean jar publishToMavenLocal
cp -v $(ls build/libs/cordapp-option-*.jar | tail -n1) ${STAGING_DIR}/apps
cp -v $(ls build/libs/cordapp-option-*.jar | tail -n1) ${STAGING_DIR}/proxy

View File

@ -0,0 +1,30 @@
#!/bin/bash
# Please ensure you run this script using source code (eg. GitHub master, branch or TAG) that reflects the version label defined below
# For example:
# corda-master => git clone https://github.com/corda/corda
# r3corda-master => git clone https://github.com/corda/enterprise
# from v4.x.x => https://github.com/corda/corda-gradle-plugins
# Please run this script from the corda source code directory
# eg. $ pwd
# /myprojects/r3corda
# $ testing/qa/behave/scripts/update-gradle-plugins.sh
VERSION=master
BUILD_DIR=`pwd`
STAGING_DIR=~/staging/corda/corda-${VERSION}
# uses gradle plugins directory
cd ${BUILD_DIR}
# update Gradle plugins (note this is being moved to a new repo https://github.com/corda/corda-gradle-plugins from v4.x.x )
echo "*************************************************************"
echo "Building and installing `cat constants.properties | grep gradlePluginsVersion`"
echo "*************************************************************"
cd publish-utils
../gradlew -u install
cd ../
./gradlew install
cd ..

View File

@ -0,0 +1,45 @@
#!/bin/bash
# Please ensure you run this script using source code (eg. GitHub master, branch or TAG) that reflects the version label defined below
# For example:
# corda-master => git clone https://github.com/corda/corda
# r3corda-master => git clone https://github.com/corda/enterprise
# Please run this script from the corda source code directory
# eg. $ pwd
# /myprojects/r3corda
# $ testing/qa/behave/scripts/update-r3corda-cts.sh
VERSION=master
BUILD_DIR=`pwd`
STAGING_DIR=~/staging/corda/r3corda-${VERSION}
# Set up directories
mkdir -p ${STAGING_DIR}/apps
cd ${BUILD_DIR}
echo "*************************************************************"
echo "Building and installing $VERSION from $BUILD_DIR"
echo "to $STAGING_DIR"
echo "*************************************************************"
# Copy Corda capsule into deps
./gradlew clean install
cp -v $(ls node/capsule/build/libs/corda-*.jar | tail -n1) ${STAGING_DIR}/corda.jar
# Copy Corda libraries into apps
cp -v $(ls finance/build/libs/corda-finance-*.jar | tail -n1) ${STAGING_DIR}/apps
# build and distribute Doorman/NMS
./gradlew network-management:capsule:buildDoormanJAR
cp -v $(ls network-management/capsule/build/libs/doorman-*.jar | tail -n1) ${STAGING_DIR}/doorman.jar
# build and distribute DB Migration tool
./gradlew tools:dbmigration:shadowJar
cp -v $(ls tools/dbmigration/build/libs/*migration-*.jar | tail -n1) ${STAGING_DIR}/dbmigration.jar
# Build rpcProxy (required by CTS Scenario Driver to call Corda 3.0 which continues to use Kryo for RPC)
./gradlew testing:qa:behave:tools:rpc-proxy:rpcProxyJar
cp -v $(ls testing/qa/behave/tools/rpc-proxy/build/libs/corda-rpcProxy*.jar | tail -n1) ${STAGING_DIR}/corda-rpcProxy.jar
cp -v testing/qa/behave/tools/rpc-proxy/startRPCproxy.sh ${STAGING_DIR}

View File

@ -0,0 +1,40 @@
#!/bin/bash
# Please ensure you run this script using source code (eg. GitHub master, branch or TAG) that reflects the version label defined below
# For example:
# corda-master => git clone https://github.com/corda/corda
# r3corda-master => git clone https://github.com/corda/enterprise
# Please run this script from the corda source code directory
# eg. $ pwd
# /myprojects/r3corda
# $ testing/qa/behave/scripts/update-r3cordapps-cts.sh
VERSION=master
BUILD_DIR=`pwd`
SAMPLES_ROOT=`pwd`/../samples
STAGING_DIR=~/staging/corda/r3corda-${VERSION}
# Set up directories
mkdir -p ${STAGING_DIR}/apps
# Corda repository pinned cordapps
cd ${BUILD_DIR}
./gradlew samples:simm-valuation-demo:jar
cp -v $(ls samples/simm-valuation-demo/build/libs/simm-valuation-demo-*.jar | tail -n1) ${STAGING_DIR}/apps/simm-valuation-demo.jar
cp -v $(ls samples/simm-valuation-demo/build/libs/simm-valuation-demo-*.jar | tail -n1) ${STAGING_DIR}/proxy/simm-valuation-demo.jar
# R3 Corda only
./gradlew tools:notaryhealthcheck:cordaCompileJar
cp -v $(ls tools/notaryhealthcheck/build/libs/notaryhealthcheck-*.jar | tail -n1) ${STAGING_DIR}/apps/notaryhealthcheck.jar
cp -v $(ls tools/notaryhealthcheck/build/libs/notaryhealthcheck-*.jar | tail -n1) ${STAGING_DIR}/proxy/notaryhealthcheck.jar
# Independent cordapps
cd ${SAMPLES_ROOT}
# Options sample
cd cordapp-option
./gradlew clean jar publishToMavenLocal
cp -v $(ls build/libs/cordapp-option-*.jar | tail -n1) ${STAGING_DIR}/apps
cp -v $(ls build/libs/cordapp-option-*.jar | tail -n1) ${STAGING_DIR}/proxy

View File

@ -0,0 +1,25 @@
#!/bin/bash
# Please ensure you run this script using source code (eg. GitHub master, branch or TAG) that reflects the version label defined below
# For example:
# corda-master => git clone https://github.com/corda/corda
# r3corda-master => git clone https://github.com/corda/enterprise
# Please run this script from the corda source code directory
# eg. $ pwd
# /myprojects/r3corda
# $ testing/qa/behave/scripts/update-rpcProxy-cts.sh
VERSION=master
BUILD_DIR=`pwd`
STAGING_DIR=~/staging/corda/corda-${VERSION}
# Set up directories
mkdir -p ${STAGING_DIR}/apps
cd ${BUILD_DIR}
# Build rpcProxy (required by CTS Scenario Driver to call Corda 3.0 which continues to use Kryo for RPC)
./gradlew testing:qa:behave:tools:rpc-proxy:rpcProxyJar
cp -v $(ls testing/qa/behave/tools/rpc-proxy/build/libs/corda-rpcProxy*.jar | tail -n1) ${STAGING_DIR}/corda-rpcProxy.jar
cp -v testing/qa/behave/tools/rpc-proxy/startRPCproxy.sh ${STAGING_DIR}

View File

@ -0,0 +1,85 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
group 'net.corda.behave.tools'
apply plugin: 'java'
apply plugin: 'kotlin'
apply plugin: 'maven-publish'
configurations {
smokeTestCompile.extendsFrom testCompile
smokeTestRuntime.extendsFrom testRuntime
}
sourceSets {
rpcProxy {
kotlin {
srcDir "src/main/kotlin"
}
resources {
srcDir "config/dev"
}
}
smokeTest {
kotlin {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/smoke-test/kotlin')
}
}
}
dependencies {
compile project(':test-utils')
compile "net.corda.plugins:cordform-common:$gradle_plugins_version"
// Integration test helpers
testCompile "org.assertj:assertj-core:$assertj_version"
testCompile "junit:junit:$junit_version"
compile project(':finance')
compileOnly project(':node-api')
compile project(':core')
compile project(':client:rpc')
// includes jetty/jersey dependencies used by RPCProxyServer
compile project(':webserver')
// Jetty http server
rpcProxyCompile "org.eclipse.jetty:jetty-servlet:$jetty_version"
rpcProxyCompile "org.eclipse.jetty:jetty-webapp:$jetty_version"
rpcProxyCompile "javax.servlet:javax.servlet-api:3.1.0"
// Jersey for JAX-RS implementation for use in Jetty
rpcProxyCompile "org.glassfish.jersey.core:jersey-server:$jersey_version"
rpcProxyCompile "org.glassfish.jersey.containers:jersey-container-servlet-core:$jersey_version"
rpcProxyCompile "org.glassfish.jersey.containers:jersey-container-jetty-http:$jersey_version"
}
task smokeTest(type: Test) {
testClassesDirs = sourceSets.smokeTest.output.classesDirs
classpath = sourceSets.smokeTest.runtimeClasspath
}
task rpcProxyJar(type: Jar) {
baseName "corda-rpcProxy"
from {
configurations.rpcProxyRuntime.collect { it.isDirectory() ? it : zipTree(it) }
}
with jar
exclude("META-INF/*.DSA")
exclude("META-INF/*.RSA")
exclude("META-INF/*.SF")
manifest {
attributes 'Main-Class': 'net.corda.behave.service.proxy.RPCProxyServerKt'
}
}

View File

@ -0,0 +1,241 @@
package net.corda.behave.service.proxy
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.ContractState
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.openHttpConnection
import net.corda.core.internal.responseAs
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.*
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import java.io.InputStream
import java.security.PublicKey
import java.time.Instant
class CordaRPCProxyClient(private val targetHostAndPort: NetworkHostAndPort) : CordaRPCOps {
companion object {
val log = contextLogger()
}
init {
try {
KryoClientSerializationScheme.initialiseSerialization()
} catch (e: Exception) { log.warn("Kryo RPC Client serialization already initialised.")}
}
override fun <T> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> {
val flowName = logicType.name
val argList = listOf(flowName, *args)
log.info("Corda RPC Proxy client calling: $flowName with values: $argList")
val response = doPost<Any>(targetHostAndPort, "start-flow", argList.serialize().bytes)
val result = doneFuture(response)
return FlowHandleImpl(StateMachineRunId.createRandom(), result) as FlowHandle<T>
}
override fun nodeInfo(): NodeInfo {
return doGet(targetHostAndPort, "node-info")
}
override fun notaryIdentities(): List<Party> {
return doGet(targetHostAndPort, "notary-identities")
}
override fun <T : ContractState> vaultQuery(contractStateType: Class<out T>): Vault.Page<T> {
return doPost(targetHostAndPort, "vault-query", contractStateType.name.serialize().bytes)
}
override fun networkMapSnapshot(): List<NodeInfo> {
return doGet(targetHostAndPort, "network-map-snapshot")
}
override fun stateMachinesSnapshot(): List<StateMachineInfo> {
TODO("not implemented")
}
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
TODO("not implemented")
}
override fun <T : ContractState> vaultQueryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): Vault.Page<T> {
TODO("not implemented")
}
override fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>): Vault.Page<T> {
TODO("not implemented")
}
override fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): Vault.Page<T> {
TODO("not implemented")
}
override fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): Vault.Page<T> {
TODO("not implemented")
}
override fun <T : ContractState> vaultTrackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
TODO("not implemented")
}
override fun <T : ContractState> vaultTrack(contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
TODO("not implemented")
}
override fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria): DataFeed<Vault.Page<T>, Vault.Update<T>> {
TODO("not implemented")
}
override fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): DataFeed<Vault.Page<T>, Vault.Update<T>> {
TODO("not implemented")
}
override fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): DataFeed<Vault.Page<T>, Vault.Update<T>> {
TODO("not implemented")
}
override fun internalVerifiedTransactionsSnapshot(): List<SignedTransaction> {
TODO("not implemented")
}
override fun internalVerifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction> {
TODO("not implemented")
}
override fun stateMachineRecordedTransactionMappingSnapshot(): List<StateMachineTransactionMapping> {
TODO("not implemented")
}
override fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
TODO("not implemented")
}
override fun networkMapFeed(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange> {
TODO("not implemented")
}
override fun networkParametersFeed(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
TODO("not implemented")
}
override fun acceptNewNetworkParameters(parametersHash: SecureHash) {
TODO("not implemented")
}
override fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> {
TODO("not implemented")
}
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) {
TODO("not implemented")
}
override fun getVaultTransactionNotes(txnId: SecureHash): Iterable<String> {
TODO("not implemented")
}
override fun attachmentExists(id: SecureHash): Boolean {
TODO("not implemented")
}
override fun openAttachment(id: SecureHash): InputStream {
TODO("not implemented")
}
override fun uploadAttachment(jar: InputStream): SecureHash {
TODO("not implemented")
}
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String): SecureHash {
TODO("not implemented")
}
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
TODO("not implemented")
}
override fun currentNodeTime(): Instant {
TODO("not implemented")
}
override fun waitUntilNetworkReady(): CordaFuture<Void?> {
TODO("not implemented")
}
override fun wellKnownPartyFromAnonymous(party: AbstractParty): Party? {
TODO("not implemented")
}
override fun partyFromKey(key: PublicKey): Party? {
TODO("not implemented")
}
override fun wellKnownPartyFromX500Name(x500Name: CordaX500Name): Party? {
TODO("not implemented")
}
override fun notaryPartyFromX500Name(x500Name: CordaX500Name): Party? {
TODO("not implemented")
}
override fun partiesFromName(query: String, exactMatch: Boolean): Set<Party> {
return doPost(targetHostAndPort, "parties-from-name", query.serialize().bytes)
}
override fun registeredFlows(): List<String> {
return doGet(targetHostAndPort, "registered-flows")
}
override fun nodeInfoFromParty(party: AbstractParty): NodeInfo? {
TODO("not implemented")
}
override fun clearNetworkMapCache() {
TODO("not implemented")
}
override fun setFlowsDrainingModeEnabled(enabled: Boolean) {
TODO("not implemented")
}
override fun isFlowsDrainingModeEnabled(): Boolean {
TODO("not implemented")
}
override fun shutdown() {
TODO("not implemented")
}
override fun killFlow(id: StateMachineRunId): Boolean {
TODO("not implemented")
}
private inline fun <reified T : Any> doPost(hostAndPort: NetworkHostAndPort, path: String, payload: ByteArray) : T {
val url = java.net.URL("http://$hostAndPort/rpc/$path")
val connection = url.openHttpConnection().apply {
doOutput = true
requestMethod = "POST"
setRequestProperty("Content-Type", javax.ws.rs.core.MediaType.APPLICATION_OCTET_STREAM)
outputStream.write(payload)
}
return connection.responseAs()
}
private inline fun <reified T : Any> doGet(hostAndPort: NetworkHostAndPort, path: String): T {
return java.net.URL("http://$hostAndPort/rpc/$path").openHttpConnection().responseAs()
}
}

View File

@ -0,0 +1,93 @@
package net.corda.behave.service.proxy
import net.corda.behave.service.proxy.RPCProxyServer.Companion.initialiseSerialization
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.ServerConnector
import org.eclipse.jetty.server.handler.HandlerCollection
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.servlet.ServletContainer
import java.io.Closeable
import java.net.InetSocketAddress
class RPCProxyServer(hostAndPort: NetworkHostAndPort, val webService: RPCProxyWebService) : Closeable {
fun start(): Boolean {
log.info("Starting RPC Proxy web services...")
try {
buildServletContextHandler()
server.start()
}
catch(e: Exception) {
log.info("Failed to start RPC Proxy server: ${e.message}")
return false
}
log.info("RPC Proxy web services started on $hostAndPort with ${webService.javaClass.simpleName}}")
return true
}
override fun close() {
log.info("Shutting down RPC Proxy web services...")
server.stop()
server.join()
}
private val server: Server = Server(InetSocketAddress(hostAndPort.host, hostAndPort.port)).apply {
handler = HandlerCollection().apply {
addHandler(buildServletContextHandler())
}
}
private val hostAndPort: NetworkHostAndPort
get() = server.connectors.mapNotNull { it as? ServerConnector }
.map { NetworkHostAndPort(it.host, it.localPort) }
.first()
private fun buildServletContextHandler(): ServletContextHandler {
return ServletContextHandler().apply {
contextPath = "/"
val resourceConfig = ResourceConfig().apply {
// Add your API provider classes (annotated for JAX-RS) here
register(webService)
}
val jerseyServlet = ServletHolder(ServletContainer(resourceConfig)).apply { initOrder = 0 }// Initialise at server start
addServlet(jerseyServlet, "/*")
}
}
companion object {
val log = contextLogger()
fun initialiseSerialization() {
try {
nodeSerializationEnv =
SerializationEnvironmentImpl(
SerializationFactoryImpl().apply {
registerScheme(KryoClientSerializationScheme())
registerScheme(AMQPClientSerializationScheme(emptyList()))
},
AMQP_P2P_CONTEXT,
rpcClientContext = KRYO_RPC_CLIENT_CONTEXT)
}
catch(e: Exception) { log.warn("Skipping initialiseSerialization: ${e.message}") }
}
}
}
fun main(args: Array<String>) {
initialiseSerialization()
val portNo = args.singleOrNull() ?: throw IllegalArgumentException("Please specify a port number")
val hostAndPort = NetworkHostAndPort("localhost", portNo.toIntOrNull() ?: 13000)
println("Starting RPC Proxy Server on [$hostAndPort] ...")
RPCProxyServer(hostAndPort, webService = RPCProxyWebService(hostAndPort)).start()
}

View File

@ -0,0 +1,163 @@
package net.corda.behave.service.proxy
import net.corda.behave.service.proxy.RPCProxyWebService.Companion.RPC_PROXY_PATH
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.core.contracts.ContractState
import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import java.io.InputStream
import javax.servlet.http.HttpServletRequest
import javax.ws.rs.Consumes
import javax.ws.rs.GET
import javax.ws.rs.POST
import javax.ws.rs.Path
import javax.ws.rs.core.Context
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response
import javax.ws.rs.core.Response.status
@Path(RPC_PROXY_PATH)
class RPCProxyWebService(targetHostAndPort: NetworkHostAndPort) {
// see "NetworkInterface" port allocation definitions
private val targetPort = targetHostAndPort.port - 1000
init {
try {
effectiveSerializationEnv
} catch (e: IllegalStateException) {
try {
KryoClientSerializationScheme.initialiseSerialization()
} catch (e: IllegalStateException) {
// Race e.g. two of these constructed in parallel, ignore.
}
}
}
companion object {
private val log = contextLogger()
const val DEFAULT_PASSWORD = "S0meS3cretW0rd"
const val RPC_PROXY_PATH = "rpc"
}
@GET
@Path("my-ip")
fun myIp(@Context request: HttpServletRequest): Response {
return createResponse("HELLO! My ip is ${request.remoteHost}:${request.remotePort}")
}
@GET
@Path("node-info")
fun nodeInfo(@Context request: HttpServletRequest): Response {
log.info("nodeInfo")
return use {
it.nodeInfo()
}
}
@GET
@Path("registered-flows")
fun registeredFlows(@Context request: HttpServletRequest): Response {
log.info("registeredFlows")
return use {
it.registeredFlows()
}
}
@GET
@Path("notary-identities")
fun notaryIdentities(@Context request: HttpServletRequest): Response {
log.info("networkMapSnapshot")
return use {
it.notaryIdentities()
}
}
@GET
@Path("network-map-snapshot")
fun networkMapSnapshot(@Context request: HttpServletRequest): Response {
log.info("networkMapSnapshot")
return use {
it.networkMapSnapshot()
}
}
@POST
@Path("parties-from-name")
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
fun partiesFromName(input: InputStream): Response {
log.info("partiesFromName")
val queryName = input.readBytes().deserialize<String>()
return use {
it.partiesFromName(queryName, false)
}
}
@POST
@Path("vault-query")
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
fun vaultQuery(input: InputStream): Response {
log.info("vaultQuery")
val contractStateType = input.readBytes().deserialize<String>()
val clazz = Class.forName(contractStateType) as Class<ContractState>
return use {
log.info("Calling vaultQuery with: $clazz")
it.vaultQuery(clazz)
}
}
@POST
@Path("start-flow")
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
fun startFlow(input: InputStream): Response {
log.info("startFlow")
return use { rpcClient ->
val argsList = input.readBytes().deserialize<List<Any>>()
for (i in argsList.indices) {
log.info("$i: ${argsList[i]}")
}
val flowClass = Class.forName(argsList[0] as String) as Class<FlowLogic<*>>
val flowArgs = argsList.drop(1).toTypedArray()
log.info("Calling flow: $flowClass with arguments: ${flowArgs.asList()}")
rpcClient.startFlowDynamic(flowClass, *flowArgs).returnValue.getOrThrow()
}
}
private fun <T> use(action: (CordaRPCOps) -> T): Response {
val targetHost = NetworkHostAndPort("localhost", targetPort)
val config = object : CordaRPCClientConfiguration {
override val connectionMaxRetryInterval = 10.seconds
}
log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...")
return try {
CordaRPCClient(targetHost, config).use("corda", DEFAULT_PASSWORD) {
log.info("RPC connection to ${targetHost.host}:${targetHost.port} established")
val client = it.proxy
val result = action(client)
log.info("CordaRPCOps result: $result")
return createResponse(result)
}
} catch (e: Exception) {
log.warn("RPC Proxy request failed: ", e)
e.printStackTrace()
status(Response.Status.INTERNAL_SERVER_ERROR).encoding(e.message)
}.build()
}
private fun createResponse(payload: Any?): Response {
return if (payload != null) {
Response.ok(payload.serialize().bytes)
} else {
status(Response.Status.NOT_FOUND)
}.build()
}
}

View File

@ -0,0 +1,33 @@
package net.corda.behave.service.proxy
import net.corda.core.internal.checkOkResponse
import net.corda.core.internal.openHttpConnection
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import org.junit.Test
class RPCProxyServerTest {
private val rpcProxyHostAndPort = NetworkHostAndPort("localhost", 13000)
private val nodeHostAndPort = NetworkHostAndPort("localhost", 12000)
@Test
fun `execute RPCOp`() {
RPCProxyServer(rpcProxyHostAndPort,
webService = RPCProxyWebService(nodeHostAndPort)).use {
it.start()
it.doPost("rpcOps", OpaqueBytes.of(0).bytes)
}
}
private fun RPCProxyServer.doPost(path: String, payload: ByteArray) {
val url = java.net.URL("http://$rpcProxyHostAndPort/rpc/$path")
url.openHttpConnection().apply {
doOutput = true
requestMethod = "POST"
setRequestProperty("Content-Type", javax.ws.rs.core.MediaType.APPLICATION_OCTET_STREAM)
outputStream.write(payload)
checkOkResponse()
}
}
}

View File

@ -0,0 +1,292 @@
package net.corda.behave.service.proxy
import net.corda.core.internal.openHttpConnection
import net.corda.core.internal.responseAs
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.POUNDS
import net.corda.finance.SWISS_FRANCS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashExitFlow
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import org.junit.Assert.assertTrue
import org.junit.Assert.fail
import org.junit.Test
class RPCProxyWebServiceTest {
/**
* client -> HTTPtoRPCProxy -> Corda Node
*/
private val hostAndPort = NetworkHostAndPort("localhost", 13002)
private val rpcProxyClient = CordaRPCProxyClient(hostAndPort)
private val hostAndPortB = NetworkHostAndPort("localhost", 13007)
private val rpcProxyClientB = CordaRPCProxyClient(hostAndPortB)
private val hostAndPortC = NetworkHostAndPort("localhost", 13012)
private val rpcProxyClientC = CordaRPCProxyClient(hostAndPortC)
@Test
fun myIp() {
val response = doGet<String>("my-ip")
println(response)
assertTrue(response.contains("My ip is"))
}
@Test
fun nodeInfo() {
val response = rpcProxyClient.nodeInfo()
println(response)
}
@Test
fun registeredFlows() {
val response = rpcProxyClient.registeredFlows()
println(response)
}
@Test
fun notaryIdentities() {
val response = rpcProxyClient.notaryIdentities()
println(response)
}
@Test
fun networkMapSnapshot() {
val response = rpcProxyClient.networkMapSnapshot()
println(response)
}
@Test
fun startFlowCashIssuePartyA() {
val notary = rpcProxyClient.notaryIdentities()[0]
val response = rpcProxyClient.startFlow(::CashIssueFlow, POUNDS(500), OpaqueBytes.of(1), notary)
val result = response.returnValue.getOrThrow().stx
println(result)
}
@Test
fun startFlowCashIssuePartyB() {
val notary = rpcProxyClientB.notaryIdentities()[0]
val response = rpcProxyClientB.startFlow(::CashIssueFlow, DOLLARS(1000), OpaqueBytes.of(1), notary)
val result = response.returnValue.getOrThrow().stx
println(result)
}
@Test
fun startFlowCashPaymentToPartyC() {
val recipient = rpcProxyClientB.partiesFromName("PartyC", false).first()
val response = rpcProxyClientB.startFlow(::CashPaymentFlow, DOLLARS(100), recipient)
val result = response.returnValue.getOrThrow().stx
println(result)
}
@Test
fun startFlowCashPaymentToPartyB() {
val recipient = rpcProxyClient.partiesFromName("PartyB", false).first()
val response = rpcProxyClient.startFlow(::CashPaymentFlow, POUNDS(250), recipient)
val result = response.returnValue.getOrThrow().stx
println(result)
}
@Test
fun startFlowCashPaymentToPartyA() {
val recipient = rpcProxyClientB.partiesFromName("PartyA", false).first()
val response = rpcProxyClientB.startFlow(::CashPaymentFlow, DOLLARS(500), recipient)
val result = response.returnValue.getOrThrow().stx
println(result)
}
@Test
fun startFlowCashExit() {
val response = rpcProxyClient.startFlow(::CashExitFlow, POUNDS(500), OpaqueBytes.of(1))
val result = response.returnValue.getOrThrow().stx
println(result)
}
@Test
fun startMultiPartyCashFlows() {
val notary = rpcProxyClient.notaryIdentities()[0]
// Party A issue 500 GBP
println("Party A issuing 500 GBP")
rpcProxyClient.startFlow(::CashIssueFlow, POUNDS(500), OpaqueBytes.of(1), notary).returnValue.getOrThrow()
// Party B issue 1000 USD
println("Party B issuing 1000 USD")
rpcProxyClientB.startFlow(::CashIssueFlow, DOLLARS(1000), OpaqueBytes.of(1), notary).returnValue.getOrThrow()
// Party A transfers 250 GBP to Party B
println("Party A transferring 250 GBP to Party B")
val partyB = rpcProxyClient.partiesFromName("PartyB", false).first()
rpcProxyClient.startFlow(::CashPaymentFlow, POUNDS(250), partyB).returnValue.getOrThrow()
// Party B transfers 500 USD to Party A
println("Party B transferring 500 USD to Party A")
val partyA = rpcProxyClientB.partiesFromName("PartyA", false).first()
rpcProxyClientB.startFlow(::CashPaymentFlow, DOLLARS(500), partyA).returnValue.getOrThrow()
// Party B transfers back 125 GBP to Party A
println("Party B transferring back 125 GBP to Party A")
rpcProxyClientB.startFlow(::CashPaymentFlow, POUNDS(125), partyA).returnValue.getOrThrow()
// Party A transfers back 250 USD to Party B
println("Party A transferring back 250 USD to Party B")
rpcProxyClient.startFlow(::CashPaymentFlow, DOLLARS(250), partyB).returnValue.getOrThrow()
// Query the Vault of each respective Party
val responseA = rpcProxyClient.vaultQuery(Cash.State::class.java)
responseA.states.forEach { state ->
println("PartyA: ${state.state.data.amount}")
}
val responseB = rpcProxyClientB.vaultQuery(Cash.State::class.java)
responseB.states.forEach { state ->
println("PartyB: ${state.state.data.amount}")
}
}
@Test
fun startMultiABCPartyCashFlows() {
val notary = rpcProxyClient.notaryIdentities()[0]
while(true) {
// Party A issue 500 GBP
println("Party A issuing 500 GBP")
rpcProxyClient.startFlow(::CashIssueFlow, POUNDS(500), OpaqueBytes.of(1), notary).returnValue.getOrThrow()
// Party B issue 500 USD
println("Party B issuing 500 USD")
rpcProxyClientB.startFlow(::CashIssueFlow, DOLLARS(500), OpaqueBytes.of(1), notary).returnValue.getOrThrow()
// Party C issue 500 CHF
println("Party C issuing 500 CHF")
rpcProxyClientC.startFlow(::CashIssueFlow, SWISS_FRANCS(500), OpaqueBytes.of(1), notary).returnValue.getOrThrow()
// Party A transfers 250 GBP to Party B who transfers to party C
println("Party A transferring 250 GBP to Party B")
val partyB = rpcProxyClient.partiesFromName("PartyB", false).first()
rpcProxyClient.startFlow(::CashPaymentFlow, POUNDS(250), partyB).returnValue.getOrThrow()
println(" ... and forwarding to Party C")
val partyC = rpcProxyClientB.partiesFromName("PartyC", false).first()
rpcProxyClientB.startFlow(::CashPaymentFlow, POUNDS(250), partyC).returnValue.getOrThrow()
// Party B transfers 500 USD to Party C who transfers to party A
println("Party B transferring 500 USD to Party C")
rpcProxyClientB.startFlow(::CashPaymentFlow, DOLLARS(250), partyC).returnValue.getOrThrow()
println(" ... and forwarding to Party A")
val partyA = rpcProxyClientC.partiesFromName("PartyA", false).first()
rpcProxyClientC.startFlow(::CashPaymentFlow, DOLLARS(250), partyA).returnValue.getOrThrow()
// Party C transfers 550 CHF to Party A who transfers to party B
println("Party C transferring 250 CHF to Party A")
rpcProxyClientC.startFlow(::CashPaymentFlow, SWISS_FRANCS(250), partyA).returnValue.getOrThrow()
println(" ... and forwarding to Party B")
rpcProxyClient.startFlow(::CashPaymentFlow, SWISS_FRANCS(250), partyB).returnValue.getOrThrow()
// Query the Vault of each respective Party
val responseA = rpcProxyClient.vaultQuery(Cash.State::class.java)
responseA.states.forEach { state ->
println("PartyA: ${state.state.data.amount}")
}
val responseB = rpcProxyClientB.vaultQuery(Cash.State::class.java)
responseB.states.forEach { state ->
println("PartyB: ${state.state.data.amount}")
}
val responseC = rpcProxyClientC.vaultQuery(Cash.State::class.java)
responseC.states.forEach { state ->
println("PartyC: ${state.state.data.amount}")
}
println("============================================================================================")
}
}
// enable Flow Draining on Node B
@Test
fun startMultiACPartyCashFlows() {
val notary = rpcProxyClient.notaryIdentities()[0]
while(true) {
// Party A issue 500 GBP
println("Party A issuing 500 GBP")
rpcProxyClient.startFlow(::CashIssueFlow, POUNDS(500), OpaqueBytes.of(1), notary).returnValue.getOrThrow()
// Party C issue 500 CHF
println("Party C issuing 500 CHF")
rpcProxyClientC.startFlow(::CashIssueFlow, SWISS_FRANCS(500), OpaqueBytes.of(1), notary).returnValue.getOrThrow()
// Party A transfers 250 GBP to Party B who transfers to party C
println("Party A transferring 250 GBP to Party B")
val partyB = rpcProxyClient.partiesFromName("PartyB", false).first()
rpcProxyClient.startFlow(::CashPaymentFlow, POUNDS(250), partyB) // BLOCKS!!!!!
println(" ... and forwarding to Party A")
val partyA = rpcProxyClientC.partiesFromName("PartyA", false).first()
rpcProxyClientC.startFlow(::CashPaymentFlow, DOLLARS(250), partyA).returnValue.getOrThrow()
// Party C transfers 550 CHF to Party A who transfers to party B
println("Party C transferring 250 CHF to Party A")
rpcProxyClientC.startFlow(::CashPaymentFlow, SWISS_FRANCS(250), partyA).returnValue.getOrThrow()
println(" ... and forwarding to Party B")
rpcProxyClient.startFlow(::CashPaymentFlow, SWISS_FRANCS(250), partyB)
// Query the Vault of each respective Party
val responseA = rpcProxyClient.vaultQuery(Cash.State::class.java)
responseA.states.forEach { state ->
println("PartyA: ${state.state.data.amount}")
}
val responseB = rpcProxyClientB.vaultQuery(Cash.State::class.java)
responseB.states.forEach { state ->
println("PartyB: ${state.state.data.amount}")
}
val responseC = rpcProxyClientC.vaultQuery(Cash.State::class.java)
responseC.states.forEach { state ->
println("PartyC: ${state.state.data.amount}")
}
println("============================================================================================")
}
}
@Test
fun vaultQueryCash() {
try {
val responseA = rpcProxyClient.vaultQuery(Cash.State::class.java)
responseA.states.forEach { state ->
println("PartyA: ${state.state.data.amount}")
}
val responseB = rpcProxyClientB.vaultQuery(Cash.State::class.java)
responseB.states.forEach { state ->
println("PartyB: ${state.state.data.amount}")
}
val responseC = rpcProxyClientC.vaultQuery(Cash.State::class.java)
responseC.states.forEach { state ->
println("PartyC: ${state.state.data.amount}")
}
}
catch (e: Exception) {
println("Vault Cash query error: ${e.message}")
fail()
}
}
private inline fun <reified T : Any> doGet(path: String): T {
return java.net.URL("http://$hostAndPort/rpc/$path").openHttpConnection().responseAs()
}
}

View File

@ -0,0 +1,42 @@
#!/bin/sh
DISTRO_DIR=$1
PORT="${2:-13000}"
if [ ! -d "$DISTRO_DIR" ]; then
echo "Must specify location of Corda distribution (directory does not exist: $DISTRO_DIR)"
exit 1
fi
if [ ! -f "$DISTRO_DIR/corda.jar" ]; then
echo "Distribution corda.jar not found"
exit 1
fi
if [ ! -f "$DISTRO_DIR/corda-rpcProxy.jar" ]; then
echo "Distribution corda-rpcProxy.jar not found"
exit 1
fi
# unzip corda jars into proxy directory (if not already there)
if [ ! -d "$DISTRO_DIR/proxy" ]; then
mkdir -p ${DISTRO_DIR}/proxy
unzip ${DISTRO_DIR}/corda.jar -d ${DISTRO_DIR}/proxy
fi
# launch proxy
echo "Launching RPC proxy ..."
echo "java -cp $DISTRO_DIR/corda-rpcProxy.jar:\
\n\t$(ls ${DISTRO_DIR}/proxy/*.jar | tr '\n' ':'):\
\n\t$(ls ${DISTRO_DIR}/apps/*.jar | tr '\n' ':')
\
\n\tnet.corda.behave.service.proxy.RPCProxyServerKt ${PORT}
"
/usr/bin/java -cp ${DISTRO_DIR}/corda-rpcProxy.jar:\
$(ls ${DISTRO_DIR}/proxy/*.jar | tr '\n' ':'):\
$(ls ${DISTRO_DIR}/apps/*.jar | tr '\n' ':') \
net.corda.behave.service.proxy.RPCProxyServerKt ${PORT} &> rpcproxy-${PORT}.log &
echo $! > /tmp/rpcProxy-pid-${PORT}
echo "RPCProxyServer PID: $(cat /tmp/rpcProxy-pid-${PORT})"