Merge pull request #180 from corda/clint-webserversplit

Split webserver from node
This commit is contained in:
Clinton 2017-02-01 12:13:31 +00:00 committed by GitHub
commit d376a902ad
51 changed files with 665 additions and 624 deletions

View File

@ -141,7 +141,6 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['build']) {
nearestCity "London"
advertisedServices = ["corda.notary.validating"]
artemisPort 10002
webPort 10003
cordapps = []
}
node {

View File

@ -102,6 +102,10 @@ interface CordaRPCOps : RPCOps {
*/
fun uploadAttachment(jar: InputStream): SecureHash
@Suppress("DEPRECATION")
@Deprecated("This service will be removed in a future milestone")
fun uploadFile(dataType: String, name: String?, file: InputStream): String
/**
* Returns the node-local current time.
*/

View File

@ -10,6 +10,8 @@ import net.corda.core.toFuture
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction
import rx.Observable
import java.io.File
import java.io.InputStream
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
@ -216,6 +218,24 @@ interface KeyManagementService {
fun freshKey(): KeyPair
}
// TODO: Move to a more appropriate location
/**
* An interface that denotes a service that can accept file uploads.
*/
interface FileUploader {
/**
* Accepts the data in the given input stream, and returns some sort of useful return message that will be sent
* back to the user in the response.
*/
fun upload(file: InputStream): String
/**
* Check if this service accepts this type of upload. For example if you are uploading interest rates this could
* be "my-service-interest-rates". Type here does not refer to file extentions or MIME types.
*/
fun accepts(type: String): Boolean
}
/**
* A sketch of an interface to a simple key/value storage system. Intended for persistence of simple blobs like
* transactions, serialised flow state machines and so on. Again, this isn't intended to imply lack of SQL or
@ -232,6 +252,10 @@ interface StorageService {
/** Provides access to storage of arbitrary JAR files (which may contain only data, no code). */
val attachments: AttachmentStorage
@Suppress("DEPRECATION")
@Deprecated("This service will be removed in a future milestone")
val uploaders: List<FileUploader>
val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage
}

View File

@ -82,13 +82,16 @@ path to the node's base directory.
:messagingServerAddress: The address of the ArtemisMQ broker instance. If not provided the node will run one locally.
:webAddress: The host and port on which the node is available for web operations.
:webAddress: The host and port on which the bundled webserver will listen if it is started.
.. note:: If HTTPS is enabled then the browser security checks will require that the accessing url host name is one
of either the machine name, fully qualified machine name, or server IP address to line up with the Subject Alternative
Names contained within the development certificates. This is addition to requiring the ``/config/dev/corda_dev_ca.cer``
root certificate be installed as a Trusted CA.
.. note:: The driver will not automatically create a webserver instance, but the Cordformation will. If this field
is present the web server will start.
:extraAdvertisedServiceIds: A list of ServiceType id strings to be advertised to the NetworkMapService and thus be available
when other nodes query the NetworkMapCache for supporting nodes. This can also include plugin services loaded from .jar
files in the plugins folder. Optionally, a custom advertised service name can be provided by appending it to the service

View File

@ -34,21 +34,16 @@ of the node internal subsystems.
extensions to be created, or registered at startup. In particular:
a. The ``webApis`` property is a list of JAX-RS annotated REST access
classes. These classes will be constructed by the embedded web server
and must have a single argument constructor taking a ``ServiceHub``
reference. This reference provides access to functions such as querying
for states through the ``VaultService`` interface, or access to the
``NetworkMapCache`` to identify services on remote nodes. The framework will
provide a database transaction in scope during the lifetime of the web
call, so full access to database data is valid. Unlike
``servicePlugins`` the ``webApis`` cannot register new protocols, or
initiate threads. (N.B. The intent is to move the Web support into a
separate helper process using the RPC mechanism to control access.)
classes. These classes will be constructed by the bundled web server
and must have a single argument constructor taking a ``CordaRPCOps``
reference. This will allow it to communicate with the node process
via the RPC interface. These web APIs will not be available if the
bundled web server is not started.
b. The ``staticServeDirs`` property maps static web content to virtual
paths and allows simple web demos to be distributed within the CorDapp
jars. (N.B. The intent is to move the Web support into a separate helper
process using the RPC mechanism to control access.)
jars. These static serving directories will not be available if the
bundled web server is not started.
c. The ``requiredFlows`` property is used to declare new protocols in
the plugin jar. Specifically the property must return a map with a key

View File

@ -12,10 +12,10 @@ App plugins
To create an app plugin you must you must extend from `CordaPluginRegistry`_. The JavaDoc contains
specific details of the implementation, but you can extend the server in the following ways:
1. Required flows: Specify which flows will be whitelisted for use in your web APIs.
1. Required flows: Specify which flows will be whitelisted for use in your RPC calls.
2. Service plugins: Register your services (see below).
3. Web APIs: You may register your own endpoints under /api/ of the built-in web server.
4. Static web endpoints: You may register your own static serving directories for serving web content.
3. Web APIs: You may register your own endpoints under /api/ of the bundled web server.
4. Static web endpoints: You may register your own static serving directories for serving web content from the web server.
5. Registering your additional classes used in RPC.
Services

View File

@ -1,8 +1,9 @@
Node administration
===================
When a node is running, it exposes an embedded database server, an embedded web server that lets you monitor it,
you can upload and download attachments, access a REST API and so on.
When a node is running, it exposes an RPC interface that lets you monitor it,
you can upload and download attachments, access a REST API and so on. A bundled
Jetty web server exposes the same interface over HTTP.
Logging
-------

View File

@ -346,3 +346,11 @@ external legacy systems by insertion of unpacked data into existing
tables. To enable these features the contract state must implement the
``QueryableState`` interface to define the mappings.
Node Web Server
---------------
A web server comes bundled with the node by default, but is not started
automatically. This web server exposes both RPC backed API calls and
static content serving. The web server is not automatically started,
you must explicitly start it in the node driver or define a web port
in your `Cordformation`_ configuration.

View File

@ -45,7 +45,8 @@ The most important fields regarding network configuration are:
* ``artemisAddress``: This specifies a host and port. Note that the address bound will **NOT** be ``my-corda-node``,
but rather ``::`` (all addresses on all interfaces). The hostname specified is the hostname *that must be externally
resolvable by other nodes in the network*. In the above configuration this is the resolvable name of a machine in a vpn.
* ``webAddress``: The address the webserver should bind. Note that the port should be distinct from that of ``artemisAddress``.
* ``webAddress``: The address the webserver should bind. Note that the port should be distinct from that of ``artemisAddress``
if they are on the same machine.
* ``networkMapService``: Details of the node running the network map service. If it's this node that's running the service
then this field must not be specified.

View File

@ -24,6 +24,16 @@ if which osascript >/dev/null; then
first=false
fi
done
for dir in `ls`; do
if [ -d $dir ]; then
cmd="bash -c 'cd $rootdir/$dir; /usr/libexec/java_home -v 1.8 --exec java -jar JAR_NAME --webserver && exit'"
script="$script
tell application \"System Events\" to tell process \"Terminal\" to keystroke \"t\" using command down
delay 0.5
do script \"$cmd\" in window 1"
first=false
fi
done
script="$script
end tell"
osascript -e "$script"
@ -39,6 +49,7 @@ else
if [ -d $dir ]; then
pushd $dir >/dev/null
xterm -T "`basename $dir`" -e 'java -jar JAR_NAME' &
xterm -T "`basename $dir`" -e 'java -jar JAR_NAME --webserver' &
popd >/dev/null
fi
done

View File

@ -6,6 +6,7 @@ Pushd %~dp0
FOR /D %%G in (.\*) DO (
Pushd %%G
start java -jar corda.jar
start java -jar corda.jar --webserver
Popd
)

View File

@ -151,6 +151,9 @@ dependencies {
compile 'io.atomix.copycat:copycat-server:1.1.4'
compile 'io.atomix.catalyst:catalyst-netty:1.1.1'
// OkHTTP: Simple HTTP library.
compile "com.squareup.okhttp3:okhttp:$okhttp_version"
// Integration test helpers
integrationTestCompile "junit:junit:$junit_version"

View File

@ -69,6 +69,10 @@ task buildCordaJAR(type: FatCapsule, dependsOn: ['buildCertSigningRequestUtility
// If you change these flags, please also update Driver.kt
jvmArgs = ['-Xmx200m', '-XX:+UseG1GC']
}
manifest {
attributes('Corda-Version': corda_version)
}
}
task buildCertSigningRequestUtilityJAR(type: FatCapsule) {

View File

@ -1,5 +1,6 @@
package net.corda.node.driver
import com.google.common.net.HostAndPort
import net.corda.core.getOrThrow
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
@ -25,6 +26,14 @@ class DriverTests {
// Check that the port is bound
addressMustNotBeBound(executorService, hostAndPort)
}
fun webserverMustBeUp(webserverAddr: HostAndPort) {
addressMustBeBound(executorService, webserverAddr)
}
fun webserverMustBeDown(webserverAddr: HostAndPort) {
addressMustNotBeBound(executorService, webserverAddr)
}
}
@Test
@ -60,4 +69,15 @@ class DriverTests {
}
nodeMustBeDown(nodeInfo.nodeInfo)
}
@Test
fun `starting a node and independent web server works`() {
val addr = driver {
val node = startNode("test").getOrThrow()
val webserverAddr = startWebserver(node).getOrThrow()
webserverMustBeUp(webserverAddr)
webserverAddr
}
webserverMustBeDown(addr)
}
}

View File

@ -21,6 +21,7 @@ class ArgsParser {
.withRequiredArg()
.defaultsTo("node.conf")
private val logToConsoleArg = optionParser.accepts("log-to-console", "If set, prints logging to the console as well as to a file.")
private val isWebserverArg = optionParser.accepts("webserver")
private val helpArg = optionParser.accepts("help").forHelp()
fun parse(vararg args: String): CmdLineOptions {
@ -30,13 +31,14 @@ class ArgsParser {
}
val baseDirectory = Paths.get(optionSet.valueOf(baseDirectoryArg)).normalize().toAbsolutePath()
val configFile = baseDirectory / optionSet.valueOf(configFileArg)
return CmdLineOptions(baseDirectory, configFile, optionSet.has(helpArg), optionSet.has(logToConsoleArg))
val isWebserver = optionSet.has(isWebserverArg)
return CmdLineOptions(baseDirectory, configFile, optionSet.has(helpArg), optionSet.has(logToConsoleArg), isWebserver)
}
fun printHelp(sink: PrintStream) = optionParser.printHelpOn(sink)
}
data class CmdLineOptions(val baseDirectory: Path, val configFile: Path?, val help: Boolean, val logToConsole: Boolean) {
data class CmdLineOptions(val baseDirectory: Path, val configFile: Path?, val help: Boolean, val logToConsole: Boolean, val isWebserver: Boolean) {
fun loadConfig(allowMissingConfig: Boolean = false, configOverrides: Map<String, Any?> = emptyMap()): Config {
return ConfigHelper.loadConfig(baseDirectory, configFile, allowMissingConfig, configOverrides)
}

View File

@ -7,6 +7,7 @@ import net.corda.core.utilities.Emoji
import net.corda.node.internal.Node
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.utilities.ANSIProgressObserver
import net.corda.node.webserver.WebServer
import org.fusesource.jansi.Ansi
import org.fusesource.jansi.AnsiConsole
import org.slf4j.LoggerFactory
@ -57,7 +58,8 @@ fun main(args: Array<String>) {
drawBanner()
System.setProperty("log-path", (cmdlineOptions.baseDirectory / "logs").toString())
val logDir = if (cmdlineOptions.isWebserver) "logs/web" else "logs"
System.setProperty("log-path", (cmdlineOptions.baseDirectory / logDir).toString())
val log = LoggerFactory.getLogger("Main")
printBasicNodeInfo("Logs can be found in", System.getProperty("log-path"))
@ -78,33 +80,39 @@ fun main(args: Array<String>) {
log.info("VM ${info.vmName} ${info.vmVendor} ${info.vmVersion}")
log.info("Machine: ${InetAddress.getLocalHost().hostName}")
log.info("Working Directory: ${cmdlineOptions.baseDirectory}")
if(cmdlineOptions.isWebserver) {
log.info("Starting as webserver on ${conf.webAddress}")
} else {
log.info("Starting as node on ${conf.artemisAddress}")
}
try {
cmdlineOptions.baseDirectory.createDirectories()
val node = conf.createNode()
node.start()
printPluginsAndServices(node)
// TODO: Webserver should be split and start from inside a WAR container
if (!cmdlineOptions.isWebserver) {
val node = conf.createNode()
node.start()
printPluginsAndServices(node)
thread {
Thread.sleep(30.seconds.toMillis())
while (!node.networkMapRegistrationFuture.isDone) {
printBasicNodeInfo("Waiting for response from network map ...")
Thread.sleep(30.seconds.toMillis())
node.networkMapRegistrationFuture.success {
val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0
printBasicNodeInfo("Node started up and registered in $elapsed sec")
if (renderBasicInfoToConsole)
ANSIProgressObserver(node.smm)
} failure {
log.error("Error during network map registration", it)
exitProcess(1)
}
}
node.networkMapRegistrationFuture.success {
node.run()
} else {
val server = WebServer(conf)
server.start()
val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0
printBasicNodeInfo("Node started up and registered in $elapsed sec")
if (renderBasicInfoToConsole)
ANSIProgressObserver(node.smm)
} failure {
log.error("Error during network map registration", it)
exitProcess(1)
printBasicNodeInfo("Webserver started up in $elapsed sec")
server.run()
}
node.run()
} catch (e: Exception) {
log.error("Exception during node startup", e)
exitProcess(1)

View File

@ -1,153 +0,0 @@
package net.corda.node.api
import net.corda.core.contracts.*
import net.corda.node.api.StatesQuery
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.SerializedBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import java.time.Instant
import java.time.LocalDateTime
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response
/**
* Top level interface to external interaction with the distributed ledger.
*
* Wherever a list is returned by a fetchXXX method that corresponds with an input list, that output list will have optional elements
* where a null indicates "missing" and the elements returned will be in the order corresponding with the input list.
*
*/
@Path("")
interface APIServer {
/**
* Report current UTC time as understood by the platform.
*/
@GET
@Path("servertime")
@Produces(MediaType.APPLICATION_JSON)
fun serverTime(): LocalDateTime
/**
* Report whether this node is started up or not.
*/
@GET
@Path("status")
@Produces(MediaType.TEXT_PLAIN)
fun status(): Response
/**
* Report this node's configuration and identities.
* Currently tunnels the NodeInfo as an encoding of the Kryo serialised form.
* TODO this functionality should be available via the RPC
*/
@GET
@Path("info")
@Produces(MediaType.APPLICATION_JSON)
fun info(): NodeInfo
/**
* Query your "local" states (containing only outputs involving you) and return the hashes & indexes associated with them
* to probably be later inflated by fetchLedgerTransactions() or fetchStates() although because immutable you can cache them
* to avoid calling fetchLedgerTransactions() many times.
*
* @param query Some "where clause" like expression.
* @return Zero or more matching States.
*/
fun queryStates(query: StatesQuery): List<StateRef>
fun fetchStates(states: List<StateRef>): Map<StateRef, TransactionState<ContractState>?>
/**
* Query for immutable transactions (results can be cached indefinitely by their id/hash).
*
* @param txs The hashes (from [StateRef.txhash] returned from [queryStates]) you would like full transactions for.
* @return null values indicate missing transactions from the requested list.
*/
fun fetchTransactions(txs: List<SecureHash>): Map<SecureHash, SignedTransaction?>
/**
* TransactionBuildSteps would be invocations of contract.generateXXX() methods that all share a common TransactionBuilder
* and a common contract type (e.g. Cash or CommercialPaper)
* which would automatically be passed as the first argument (we'd need that to be a criteria/pattern of the generateXXX methods).
*/
fun buildTransaction(type: ContractDefRef, steps: List<TransactionBuildStep>): SerializedBytes<WireTransaction>
/**
* Generate a signature for this transaction signed by us.
*/
fun generateTransactionSignature(tx: SerializedBytes<WireTransaction>): DigitalSignature.WithKey
/**
* Attempt to commit transaction (returned from build transaction) with the necessary signatures for that to be
* successful, otherwise exception is thrown.
*/
fun commitTransaction(tx: SerializedBytes<WireTransaction>, signatures: List<DigitalSignature.WithKey>): SecureHash
/**
* This method would not return until the flow is finished (hence the "Sync").
*
* Longer term we'd add an Async version that returns some kind of FlowInvocationRef that could be queried and
* would appear on some kind of event message that is broadcast informing of progress.
*
* Will throw exception if flow fails.
*/
fun invokeFlowSync(type: FlowRef, args: Map<String, Any?>): Any?
// fun invokeFlowAsync(type: FlowRef, args: Map<String, Any?>): FlowInstanceRef
/**
* Fetch flows that require a response to some prompt/question by a human (on the "bank" side).
*/
fun fetchFlowsRequiringAttention(query: StatesQuery): Map<StateRef, FlowRequiringAttention>
/**
* Provide the response that a flow is waiting for.
*
* @param flow Should refer to a previously supplied FlowRequiringAttention.
* @param stepId Which step of the flow are we referring too.
* @param choice Should be one of the choices presented in the FlowRequiringAttention.
* @param args Any arguments required.
*/
fun provideFlowResponse(flow: FlowInstanceRef, choice: SecureHash, args: Map<String, Any?>)
}
/**
* Encapsulates the contract type. e.g. Cash or CommercialPaper etc.
*/
interface ContractDefRef {
}
data class ContractClassRef(val className: String) : ContractDefRef
data class ContractLedgerRef(val hash: SecureHash) : ContractDefRef
/**
* Encapsulates the flow to be instantiated. e.g. TwoPartyTradeFlow.Buyer.
*/
interface FlowRef {
}
data class FlowClassRef(val className: String) : FlowRef
data class FlowInstanceRef(val flowInstance: SecureHash, val flowClass: FlowClassRef, val flowStepId: String)
/**
* Thinking that Instant is OK for short lived flow deadlines.
*/
data class FlowRequiringAttention(val ref: FlowInstanceRef, val prompt: String, val choiceIdsToMessages: Map<SecureHash, String>, val dueBy: Instant)
/**
* Encapsulate a generateXXX method call on a contract.
*/
data class TransactionBuildStep(val generateMethodName: String, val args: Map<String, Any?>)

View File

@ -2,8 +2,6 @@
package net.corda.node.driver
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
@ -19,29 +17,32 @@ import net.corda.core.utilities.loggerFor
import net.corda.node.services.User
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.SSLConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.JsonSupport
import net.corda.node.utilities.ServiceIdentityGenerator
import okhttp3.OkHttpClient
import okhttp3.Request
import org.slf4j.Logger
import java.io.File
import java.net.*
import java.nio.file.Path
import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.*
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
/**
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
*
@ -84,6 +85,13 @@ interface DriverDSLExposedInterface {
type: ServiceType = RaftValidatingNotaryService.type,
rpcUsers: List<User> = emptyList()): Future<Pair<Party, List<NodeHandle>>>
/**
* Starts a web server for a node
*
* @param handle The handle for the node that this webserver connects to via RPC.
*/
fun startWebserver(handle: NodeHandle): ListenableFuture<HostAndPort>
fun waitForAllNodesToFinish()
}
@ -187,6 +195,7 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
return returnValue
} catch (exception: Throwable) {
println("Driver shutting down because of exception $exception")
exception.printStackTrace()
throw exception
} finally {
driverDsl.shutdown()
@ -318,24 +327,14 @@ open class DriverDSL(
executorService.shutdown()
}
private fun queryNodeInfo(webAddress: HostAndPort): NodeInfo? {
val url = URL("http://$webAddress/api/info")
try {
val conn = url.openConnection() as HttpURLConnection
conn.requestMethod = "GET"
if (conn.responseCode != 200) {
log.error("Received response code ${conn.responseCode} from $url during startup.")
return null
}
// For now the NodeInfo is tunneled in its Kryo format over the Node's Web interface.
val om = ObjectMapper()
val module = SimpleModule("NodeInfo")
module.addDeserializer(NodeInfo::class.java, JsonSupport.NodeInfoDeserializer)
om.registerModule(module)
return om.readValue(conn.inputStream, NodeInfo::class.java)
private fun queryNodeInfo(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): NodeInfo? {
while (true) try {
val client = CordaRPCClient(nodeAddress, sslConfig)
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
val rpcOps = client.proxy(timeout = Duration.of(15, ChronoUnit.SECONDS))
return rpcOps.nodeIdentity()
} catch(e: Exception) {
log.error("Could not query node info at $url due to an exception.", e)
return null
log.error("Retrying query node info at $nodeAddress")
}
}
@ -378,7 +377,7 @@ open class DriverDSL(
val startNode = startNode(executorService, configuration, quasarJarPath, debugPort)
registerProcess(startNode)
return startNode.map {
NodeHandle(queryNodeInfo(apiAddress)!!, configuration, it)
NodeHandle(queryNodeInfo(messagingAddress, configuration)!!, configuration, it)
}
}
@ -413,12 +412,39 @@ open class DriverDSL(
}
}
private fun queryWebserver(configuration: FullNodeConfiguration): HostAndPort? {
val protocol = if (configuration.useHTTPS) {
"https://"
} else {
"http://"
}
val url = URL(protocol + configuration.webAddress.toString() + "/api/status")
val client = OkHttpClient.Builder().connectTimeout(5, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build()
while (true) try {
val response = client.newCall(Request.Builder().url(url).build()).execute()
if (response.isSuccessful && (response.body().string() == "started")) {
return configuration.webAddress
}
} catch(e: ConnectException) {
log.debug("Retrying webserver info at ${configuration.webAddress}")
}
}
override fun startWebserver(handle: NodeHandle): ListenableFuture<HostAndPort> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
return future {
registerProcess(DriverDSL.startWebserver(executorService, handle.configuration, debugPort))
queryWebserver(handle.configuration)!!
}
}
override fun start() {
startNetworkMapService()
}
private fun startNetworkMapService(): ListenableFuture<Process> {
val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val baseDirectory = driverDirectory / networkMapLegalName
@ -428,7 +454,6 @@ open class DriverDSL(
configOverrides = mapOf(
"myLegalName" to networkMapLegalName,
"artemisAddress" to networkMapAddress.toString(),
"webAddress" to apiAddress.toString(),
"extraAdvertisedServiceIds" to "",
"useTestClock" to useTestClock
)
@ -497,13 +522,37 @@ open class DriverDSL(
builder.inheritIO()
builder.directory(nodeConf.baseDirectory.toFile())
val process = builder.start()
return Futures.allAsList(
addressMustBeBound(executorService, nodeConf.artemisAddress),
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
// the handlers for the advertised services are not yet registered. A hacky workaround is that we wait for
// the web api address to be bound as well, as that starts after the services. Needs rethinking.
addressMustBeBound(executorService, nodeConf.webAddress)
).map { process }
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
// the handlers for the advertised services are not yet registered. Needs rethinking.
return addressMustBeBound(executorService, nodeConf.artemisAddress).map { process }
}
private fun startWebserver(
executorService: ScheduledExecutorService,
nodeConf: FullNodeConfiguration,
debugPort: Int?): ListenableFuture<Process> {
val className = "net.corda.node.Corda" // cannot directly get class for this, so just use string
val separator = System.getProperty("file.separator")
val classpath = System.getProperty("java.class.path")
val path = System.getProperty("java.home") + separator + "bin" + separator + "java"
val debugPortArg = if (debugPort != null)
listOf("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$debugPort")
else
emptyList()
val javaArgs = listOf(path) +
listOf("-Dname=node-${nodeConf.artemisAddress}-webserver") + debugPortArg +
listOf(
"-cp", classpath, className,
"--base-directory", nodeConf.baseDirectory.toString(),
"--webserver")
val builder = ProcessBuilder(javaArgs)
builder.redirectError(Paths.get("error.$className.log").toFile())
builder.inheritIO()
builder.directory(nodeConf.baseDirectory.toFile())
val process = builder.start()
return addressMustBeBound(executorService, nodeConf.webAddress).map { process }
}
}
}

View File

@ -1,92 +0,0 @@
package net.corda.node.internal
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.DealState
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.linearHeadsOfType
import net.corda.core.serialization.SerializedBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.node.api.*
import java.time.LocalDateTime
import javax.ws.rs.core.Response
class APIServerImpl(val node: AbstractNode) : APIServer {
override fun serverTime(): LocalDateTime = LocalDateTime.now(node.services.clock)
override fun status(): Response {
return if (node.started) {
Response.ok("started").build()
} else {
Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("not started").build()
}
}
override fun info() = node.services.myInfo
override fun queryStates(query: StatesQuery): List<StateRef> {
// We're going to hard code two options here for now and assume that all LinearStates are deals
// Would like to maybe move to a model where we take something like a JEXL string, although don't want to develop
// something we can't later implement against a persistent store (i.e. need to pick / build a query engine)
if (query is StatesQuery.Selection) {
if (query.criteria is StatesQuery.Criteria.AllDeals) {
val states = node.services.vaultService.linearHeads
return states.values.map { it.ref }
} else if (query.criteria is StatesQuery.Criteria.Deal) {
val states = node.services.vaultService.linearHeadsOfType<DealState>().filterValues {
it.state.data.ref == query.criteria.ref
}
return states.values.map { it.ref }
}
}
return emptyList()
}
override fun fetchStates(states: List<StateRef>): Map<StateRef, TransactionState<ContractState>?> {
return node.services.vaultService.statesForRefs(states)
}
override fun fetchTransactions(txs: List<SecureHash>): Map<SecureHash, SignedTransaction?> {
throw UnsupportedOperationException()
}
override fun buildTransaction(type: ContractDefRef, steps: List<TransactionBuildStep>): SerializedBytes<WireTransaction> {
throw UnsupportedOperationException()
}
override fun generateTransactionSignature(tx: SerializedBytes<WireTransaction>): DigitalSignature.WithKey {
throw UnsupportedOperationException()
}
override fun commitTransaction(tx: SerializedBytes<WireTransaction>, signatures: List<DigitalSignature.WithKey>): SecureHash {
throw UnsupportedOperationException()
}
override fun invokeFlowSync(type: FlowRef, args: Map<String, Any?>): Any? {
return invokeFlowAsync(type, args).get()
}
private fun invokeFlowAsync(type: FlowRef, args: Map<String, Any?>): ListenableFuture<out Any?> {
if (type is FlowClassRef) {
val flowLogicRef = node.services.flowLogicRefFactory.createKotlin(type.className, args)
val flowInstance = node.services.flowLogicRefFactory.toFlowLogic(flowLogicRef)
return node.services.startFlow(flowInstance).resultFuture
} else {
throw UnsupportedOperationException("Unsupported FlowRef type: $type")
}
}
override fun fetchFlowsRequiringAttention(query: StatesQuery): Map<StateRef, FlowRequiringAttention> {
throw UnsupportedOperationException()
}
override fun provideFlowResponse(flow: FlowInstanceRef, choice: SecureHash, args: Map<String, Any?>) {
throw UnsupportedOperationException()
}
}

View File

@ -24,7 +24,6 @@ import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.FinalityFlow
import net.corda.flows.sendRequest
import net.corda.node.api.APIServer
import net.corda.node.services.api.*
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
@ -52,6 +51,7 @@ import net.corda.node.utilities.databaseTransaction
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
import org.slf4j.Logger
import java.io.File
import java.nio.file.FileAlreadyExistsException
import java.nio.file.Path
import java.security.KeyPair
@ -104,11 +104,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
// low-performance prototyping period.
protected abstract val serverThread: AffinityExecutor
// Objects in this list will be scanned by the DataUploadServlet and can be handed new data via HTTP.
// Don't mutate this after startup.
protected val _servicesThatAcceptUploads = ArrayList<AcceptsFileUpload>()
val servicesThatAcceptUploads: List<AcceptsFileUpload> = _servicesThatAcceptUploads
private val flowFactories = ConcurrentHashMap<Class<*>, (Party) -> FlowLogic<*>>()
protected val partyKeys = mutableSetOf<KeyPair>()
@ -163,7 +158,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
lateinit var identity: IdentityService
lateinit var net: MessagingServiceInternal
lateinit var netMapCache: NetworkMapCache
lateinit var api: APIServer
lateinit var scheduler: NodeSchedulerService
lateinit var flowLogicFactory: FlowLogicRefFactory
lateinit var schemas: SchemaService
@ -219,7 +213,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
// the identity key. But the infrastructure to make that easy isn't here yet.
keyManagement = makeKeyManagementService()
api = APIServerImpl(this@AbstractNode)
flowLogicFactory = initialiseFlowLogicFactory()
scheduler = NodeSchedulerService(database, services, flowLogicFactory, unfinishedSchedules = busyNodeLatch)
@ -228,6 +221,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
customServices.clear()
customServices.addAll(buildPluginServices(tokenizableServices))
val uploaders: List<FileUploader> = listOf(storageServices.first.attachments as NodeAttachmentService) +
customServices.filterIsInstance(AcceptsFileUpload::class.java)
(storage as StorageServiceImpl).initUploaders(uploaders)
// TODO: uniquenessProvider creation should be inside makeNotaryService(), but notary service initialisation
// depends on smm, while smm depends on tokenizableServices, which uniquenessProvider is part of
advertisedServices.singleOrNull { it.type.isNotary() }?.let {
@ -353,9 +350,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val service = serviceConstructor.apply(services)
serviceList.add(service)
tokenizableServices.add(service)
if (service is AcceptsFileUpload) {
_servicesThatAcceptUploads += service
}
}
return serviceList
}
@ -485,7 +479,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val attachments = makeAttachmentStorage(dir)
val checkpointStorage = DBCheckpointStorage()
val transactionStorage = DBTransactionStorage()
_servicesThatAcceptUploads += attachments
val stateMachineTransactionMappingStorage = DBTransactionMappingStorage()
return Pair(
constructStorageService(attachments, transactionStorage, stateMachineTransactionMappingStorage),

View File

@ -104,6 +104,12 @@ class CordaRPCOpsImpl(
override fun attachmentExists(id: SecureHash) = services.storageService.attachments.openAttachment(id) != null
override fun uploadAttachment(jar: InputStream) = services.storageService.attachments.importAttachment(jar)
override fun currentNodeTime(): Instant = Instant.now(services.clock)
override fun uploadFile(dataType: String, name: String?, file: InputStream): String {
val acceptor = services.storageService.uploaders.firstOrNull { it.accepts(dataType) }
return databaseTransaction(database) {
acceptor?.upload(file) ?: throw RuntimeException("Cannot find file upload acceptor for $dataType")
}
}
override fun partyFromKey(key: CompositeKey) = services.identityService.partyFromKey(key)
override fun partyFromName(name: String) = services.identityService.partyFromName(name)

View File

@ -6,13 +6,12 @@ import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.div
import net.corda.core.flatMap
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.getOrThrow
import net.corda.core.messaging.RPCOps
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.success
import net.corda.core.utilities.loggerFor
import net.corda.node.printBasicNodeInfo
import net.corda.node.serialization.NodeClock
@ -20,39 +19,19 @@ import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.servlets.AttachmentDownloadServlet
import net.corda.node.servlets.Config
import net.corda.node.servlets.DataUploadServlet
import net.corda.node.servlets.ResponseFilter
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.databaseTransaction
import org.eclipse.jetty.server.*
import org.eclipse.jetty.server.handler.HandlerCollection
import org.eclipse.jetty.servlet.DefaultServlet
import org.eclipse.jetty.servlet.FilterHolder
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
import org.eclipse.jetty.util.ssl.SslContextFactory
import org.eclipse.jetty.webapp.WebAppContext
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.server.ServerProperties
import org.glassfish.jersey.servlet.ServletContainer
import org.jetbrains.exposed.sql.Database
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.lang.reflect.InvocationTargetException
import java.net.InetAddress
import java.nio.channels.FileLock
import java.time.Clock
import java.util.*
import javax.management.ObjectName
import javax.servlet.*
import kotlin.concurrent.thread
@ -111,7 +90,6 @@ class Node(override val configuration: FullNodeConfiguration,
// serialisation/deserialisation work.
override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1)
lateinit var webServer: Server
var messageBroker: ArtemisMessagingServer? = null
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
@ -157,116 +135,6 @@ class Node(override val configuration: FullNodeConfiguration,
return networkMapConnection.flatMap { super.registerWithNetworkMap() }
}
// TODO: add flag to enable/disable webserver
private fun initWebServer(localRpc: CordaRPCOps): Server {
// Note that the web server handlers will all run concurrently, and not on the node thread.
val handlerCollection = HandlerCollection()
// Export JMX monitoring statistics and data over REST/JSON.
if (configuration.exportJMXto.split(',').contains("http")) {
val classpath = System.getProperty("java.class.path").split(System.getProperty("path.separator"))
val warpath = classpath.firstOrNull { it.contains("jolokia-agent-war-2") && it.endsWith(".war") }
if (warpath != null) {
handlerCollection.addHandler(WebAppContext().apply {
// Find the jolokia WAR file on the classpath.
contextPath = "/monitoring/json"
setInitParameter("mimeType", "application/json")
war = warpath
})
} else {
log.warn("Unable to locate Jolokia WAR on classpath")
}
}
// API, data upload and download to services (attachments, rates oracles etc)
handlerCollection.addHandler(buildServletContextHandler(localRpc))
val server = Server()
val connector = if (configuration.useHTTPS) {
val httpsConfiguration = HttpConfiguration()
httpsConfiguration.outputBufferSize = 32768
httpsConfiguration.addCustomizer(SecureRequestCustomizer())
val sslContextFactory = SslContextFactory()
sslContextFactory.keyStorePath = configuration.keyStoreFile.toString()
sslContextFactory.setKeyStorePassword(configuration.keyStorePassword)
sslContextFactory.setKeyManagerPassword(configuration.keyStorePassword)
sslContextFactory.setTrustStorePath(configuration.trustStoreFile.toString())
sslContextFactory.setTrustStorePassword(configuration.trustStorePassword)
sslContextFactory.setExcludeProtocols("SSL.*", "TLSv1", "TLSv1.1")
sslContextFactory.setIncludeProtocols("TLSv1.2")
sslContextFactory.setExcludeCipherSuites(".*NULL.*", ".*RC4.*", ".*MD5.*", ".*DES.*", ".*DSS.*")
sslContextFactory.setIncludeCipherSuites(".*AES.*GCM.*")
val sslConnector = ServerConnector(server, SslConnectionFactory(sslContextFactory, "http/1.1"), HttpConnectionFactory(httpsConfiguration))
sslConnector.port = configuration.webAddress.port
sslConnector
} else {
val httpConfiguration = HttpConfiguration()
httpConfiguration.outputBufferSize = 32768
val httpConnector = ServerConnector(server, HttpConnectionFactory(httpConfiguration))
httpConnector.port = configuration.webAddress.port
httpConnector
}
server.connectors = arrayOf<Connector>(connector)
server.handler = handlerCollection
runOnStop += Runnable { server.stop() }
server.start()
printBasicNodeInfo("Embedded web server is listening on", "http://${InetAddress.getLocalHost().hostAddress}:${connector.port}/")
return server
}
private fun buildServletContextHandler(localRpc: CordaRPCOps): ServletContextHandler {
return ServletContextHandler().apply {
contextPath = "/"
setAttribute("node", this@Node)
addServlet(DataUploadServlet::class.java, "/upload/*")
addServlet(AttachmentDownloadServlet::class.java, "/attachments/*")
val resourceConfig = ResourceConfig()
// Add your API provider classes (annotated for JAX-RS) here
resourceConfig.register(Config(services))
resourceConfig.register(ResponseFilter())
resourceConfig.register(api)
val webAPIsOnClasspath = pluginRegistries.flatMap { x -> x.webApis }
for (webapi in webAPIsOnClasspath) {
log.info("Add plugin web API from attachment $webapi")
val customAPI = try {
webapi.apply(localRpc)
} catch (ex: InvocationTargetException) {
log.error("Constructor $webapi threw an error: ", ex.targetException)
continue
}
resourceConfig.register(customAPI)
}
val staticDirMaps = pluginRegistries.map { x -> x.staticServeDirs }
val staticDirs = staticDirMaps.flatMap { it.keys }.zip(staticDirMaps.flatMap { it.values })
staticDirs.forEach {
val staticDir = ServletHolder(DefaultServlet::class.java)
staticDir.setInitParameter("resourceBase", it.second)
staticDir.setInitParameter("dirAllowed", "true")
staticDir.setInitParameter("pathInfoOnly", "true")
addServlet(staticDir, "/web/${it.first}/*")
}
// Give the app a slightly better name in JMX rather than a randomly generated one and enable JMX
resourceConfig.addProperties(mapOf(ServerProperties.APPLICATION_NAME to "node.api",
ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED to "true"))
val container = ServletContainer(resourceConfig)
val jerseyServlet = ServletHolder(container)
addServlet(jerseyServlet, "/api/*")
jerseyServlet.initOrder = 0 // Initialise at server start
// Wrap all API calls in a database transaction.
val filterHolder = FilterHolder(DatabaseTransactionFilter(database))
addFilter(filterHolder, "/api/*", EnumSet.of(DispatcherType.REQUEST))
addFilter(filterHolder, "/upload/*", EnumSet.of(DispatcherType.REQUEST))
}
}
override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider {
return when (type) {
RaftValidatingNotaryService.type -> with(configuration) {
@ -305,43 +173,27 @@ class Node(override val configuration: FullNodeConfiguration,
super.initialiseDatabasePersistence(insideTransaction)
}
private fun connectLocalRpcAsNodeUser(): CordaRPCOps {
val client = CordaRPCClient(configuration.artemisAddress, configuration)
client.start(NODE_USER, NODE_USER)
return client.proxy()
}
override fun start(): Node {
alreadyRunningNodeCheck()
super.start()
// Only start the service API requests once the network map registration is successfully complete
networkMapRegistrationFuture.success {
// This needs to be in a seperate thread so that we can reply to our own request to become RPC clients
thread(name = "WebServer") {
try {
webServer = initWebServer(connectLocalRpcAsNodeUser())
} catch(ex: Exception) {
// TODO: We need to decide if this is a fatal error, given the API is unavailable, or whether the API
// is not critical and we continue anyway.
log.error("Web server startup failed", ex)
}
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("net.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
}
// Only start the service API requests once the network map registration is complete
thread(name = "WebServer") {
networkMapRegistrationFuture.getOrThrow()
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("net.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
}
shutdownThread = thread(start = false) {
@ -420,6 +272,7 @@ class Node(override val configuration: FullNodeConfiguration,
chain.doFilter(request, response)
}
}
override fun init(filterConfig: FilterConfig?) {}
override fun destroy() {}
}

View File

@ -1,5 +1,7 @@
package net.corda.node.services.api
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.FileUploader
import java.io.InputStream
/**
@ -7,16 +9,12 @@ import java.io.InputStream
*
* TODO: In future, also accept uploads over the MQ interface too.
*/
interface AcceptsFileUpload {
interface AcceptsFileUpload: FileUploader {
/** A string that prefixes the URLs, e.g. "attachments" or "interest-rates". Should be OK for URLs. */
val dataTypePrefix: String
/** What file extensions are acceptable for the file to be handed to upload() */
val acceptableFileExtensions: List<String>
/**
* Accepts the data in the given input stream, and returns some sort of useful return message that will be sent
* back to the user in the response.
*/
fun upload(data: InputStream): String
override fun accepts(prefix: String) = prefix == dataTypePrefix
}

View File

@ -35,6 +35,7 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAdd
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.commons.fileupload.MultipartStream
import org.objenesis.strategy.StdInstantiatorStrategy
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@ -145,6 +146,7 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(BufferedInputStream::class.java, InputStreamSerializer)
register(Class.forName("sun.net.www.protocol.jar.JarURLConnection\$JarURLInputStream"), InputStreamSerializer)
register(MultipartStream.ItemInputStream::class.java, InputStreamSerializer)
noReferencesWithin<WireTransaction>()
@ -205,6 +207,7 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(SimpleString::class.java)
register(ServiceEntry::class.java)
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
register(RuntimeException::class.java)
register(IllegalArgumentException::class.java)
register(ArrayIndexOutOfBoundsException::class.java)
register(IndexOutOfBoundsException::class.java)

View File

@ -1,12 +1,15 @@
package net.corda.node.services.persistence
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage
import net.corda.core.node.services.TransactionStorage
import net.corda.core.node.services.TxWritableStorageService
import net.corda.core.node.services.*
import net.corda.core.serialization.SingletonSerializeAsToken
open class StorageServiceImpl(override val attachments: AttachmentStorage,
override val validatedTransactions: TransactionStorage,
override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage)
: SingletonSerializeAsToken(), TxWritableStorageService
: SingletonSerializeAsToken(), TxWritableStorageService {
lateinit override var uploaders: List<FileUploader>
fun initUploaders(uploadersList: List<FileUploader>) {
uploaders = uploadersList
}
}

View File

@ -0,0 +1,9 @@
package net.corda.node.utilities
import com.google.common.net.HostAndPort
import com.typesafe.config.Config
import java.nio.file.Path
import java.nio.file.Paths
fun Config.getHostAndPort(name: String): HostAndPort = HostAndPort.fromString(getString(name))
fun Config.getPath(name: String): Path = Paths.get(getString(name))

View File

@ -11,11 +11,12 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import net.corda.core.contracts.BusinessCalendar
import net.corda.core.crypto.*
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.i2p.crypto.eddsa.EdDSAPublicKey
import net.corda.core.node.services.IdentityService
import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
@ -23,9 +24,25 @@ import java.time.LocalDateTime
/**
* Utilities and serialisers for working with JSON representations of basic types. This adds Jackson support for
* the java.time API, some core types, and Kotlin data classes.
*
* TODO: This does not belong in node. It should be moved to the client module or a dedicated webserver module.
*/
object JsonSupport {
val javaTimeModule : Module by lazy {
interface PartyObjectMapper {
fun partyFromName(partyName: String): Party?
}
class RpcObjectMapper(val rpc: CordaRPCOps) : PartyObjectMapper, ObjectMapper() {
override fun partyFromName(partyName: String): Party? = rpc.partyFromName(partyName)
}
class IdentityObjectMapper(val identityService: IdentityService) : PartyObjectMapper, ObjectMapper(){
override fun partyFromName(partyName: String) = identityService.partyFromName(partyName)
}
class NoPartyObjectMapper: PartyObjectMapper, ObjectMapper() {
override fun partyFromName(partyName: String) = throw UnsupportedOperationException()
}
val javaTimeModule: Module by lazy {
SimpleModule("java.time").apply {
addSerializer(LocalDate::class.java, ToStringSerializer)
addDeserializer(LocalDate::class.java, LocalDateDeserializer)
@ -34,7 +51,7 @@ object JsonSupport {
}
}
val cordaModule : Module by lazy {
val cordaModule: Module by lazy {
SimpleModule("core").apply {
addSerializer(Party::class.java, PartySerializer)
addDeserializer(Party::class.java, PartyDeserializer)
@ -61,18 +78,24 @@ object JsonSupport {
}
}
fun createDefaultMapper(identities: IdentityService): ObjectMapper =
ServiceHubObjectMapper(identities).apply {
enable(SerializationFeature.INDENT_OUTPUT)
enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
/* Mapper requiring RPC support to deserialise parties from names */
fun createDefaultMapper(rpc: CordaRPCOps): ObjectMapper = configureMapper(RpcObjectMapper(rpc))
registerModule(javaTimeModule)
registerModule(cordaModule)
registerModule(KotlinModule())
}
/* For testing or situations where deserialising parties is not required */
fun createNonRpcMapper(): ObjectMapper = configureMapper(NoPartyObjectMapper())
class ServiceHubObjectMapper(val identities: IdentityService) : ObjectMapper()
/* For testing with an in memory identity service */
fun createInMemoryMapper(identityService: IdentityService) = configureMapper(IdentityObjectMapper(identityService))
private fun configureMapper(mapper: ObjectMapper): ObjectMapper = mapper.apply {
enable(SerializationFeature.INDENT_OUTPUT)
enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
registerModule(javaTimeModule)
registerModule(cordaModule)
registerModule(KotlinModule())
}
object ToStringSerializer : JsonSerializer<Any>() {
override fun serialize(obj: Any, generator: JsonGenerator, provider: SerializerProvider) {
@ -108,9 +131,10 @@ object JsonSupport {
if (parser.currentToken == JsonToken.FIELD_NAME) {
parser.nextToken()
}
val mapper = parser.codec as ServiceHubObjectMapper
val mapper = parser.codec as PartyObjectMapper
// TODO this needs to use some industry identifier(s) not just these human readable names
return mapper.identities.partyFromName(parser.text) ?: throw JsonParseException(parser, "Could not find a Party with name: ${parser.text}")
return mapper.partyFromName(parser.text) ?: throw JsonParseException(parser, "Could not find a Party with name ${parser.text}")
}
}

View File

@ -0,0 +1,179 @@
package net.corda.node.webserver
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.utilities.loggerFor
import net.corda.node.printBasicNodeInfo
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.webserver.internal.APIServerImpl
import net.corda.node.webserver.servlets.AttachmentDownloadServlet
import net.corda.node.webserver.servlets.DataUploadServlet
import net.corda.node.webserver.servlets.ObjectMapperConfig
import net.corda.node.webserver.servlets.ResponseFilter
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.eclipse.jetty.server.*
import org.eclipse.jetty.server.handler.HandlerCollection
import org.eclipse.jetty.servlet.DefaultServlet
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
import org.eclipse.jetty.util.ssl.SslContextFactory
import org.eclipse.jetty.webapp.WebAppContext
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.server.ServerProperties
import org.glassfish.jersey.servlet.ServletContainer
import java.lang.reflect.InvocationTargetException
import java.net.InetAddress
import java.util.*
// TODO: Split into a separate module under client that packages into WAR formats.
class WebServer(val config: FullNodeConfiguration) {
private companion object {
val log = loggerFor<WebServer>()
val retryDelay = 1000L // Milliseconds
}
val address = config.webAddress
private lateinit var server: Server
fun start() {
printBasicNodeInfo("Starting as webserver: ${config.webAddress}")
server = initWebServer(retryConnectLocalRpc())
}
fun run() {
while (server.isRunning) {
Thread.sleep(100) // TODO: Redesign
}
}
private fun initWebServer(localRpc: CordaRPCOps): Server {
// Note that the web server handlers will all run concurrently, and not on the node thread.
val handlerCollection = HandlerCollection()
// TODO: Move back into the node itself.
// Export JMX monitoring statistics and data over REST/JSON.
if (config.exportJMXto.split(',').contains("http")) {
val classpath = System.getProperty("java.class.path").split(System.getProperty("path.separator"))
val warpath = classpath.firstOrNull { it.contains("jolokia-agent-war-2") && it.endsWith(".war") }
if (warpath != null) {
handlerCollection.addHandler(WebAppContext().apply {
// Find the jolokia WAR file on the classpath.
contextPath = "/monitoring/json"
setInitParameter("mimeType", "application/json")
war = warpath
})
} else {
log.warn("Unable to locate Jolokia WAR on classpath")
}
}
// API, data upload and download to services (attachments, rates oracles etc)
handlerCollection.addHandler(buildServletContextHandler(localRpc))
val server = Server()
val connector = if (config.useHTTPS) {
val httpsConfiguration = HttpConfiguration()
httpsConfiguration.outputBufferSize = 32768
httpsConfiguration.addCustomizer(SecureRequestCustomizer())
val sslContextFactory = SslContextFactory()
sslContextFactory.keyStorePath = config.keyStoreFile.toString()
sslContextFactory.setKeyStorePassword(config.keyStorePassword)
sslContextFactory.setKeyManagerPassword(config.keyStorePassword)
sslContextFactory.setTrustStorePath(config.trustStoreFile.toString())
sslContextFactory.setTrustStorePassword(config.trustStorePassword)
sslContextFactory.setExcludeProtocols("SSL.*", "TLSv1", "TLSv1.1")
sslContextFactory.setIncludeProtocols("TLSv1.2")
sslContextFactory.setExcludeCipherSuites(".*NULL.*", ".*RC4.*", ".*MD5.*", ".*DES.*", ".*DSS.*")
sslContextFactory.setIncludeCipherSuites(".*AES.*GCM.*")
val sslConnector = ServerConnector(server, SslConnectionFactory(sslContextFactory, "http/1.1"), HttpConnectionFactory(httpsConfiguration))
sslConnector.port = address.port
sslConnector
} else {
val httpConfiguration = HttpConfiguration()
httpConfiguration.outputBufferSize = 32768
val httpConnector = ServerConnector(server, HttpConnectionFactory(httpConfiguration))
log.info("Starting webserver on address $address")
httpConnector.port = address.port
httpConnector
}
server.connectors = arrayOf<Connector>(connector)
server.handler = handlerCollection
//runOnStop += Runnable { server.stop() }
server.start()
log.info("Server started")
log.info("Embedded web server is listening on", "http://${InetAddress.getLocalHost().hostAddress}:${connector.port}/")
return server
}
private fun buildServletContextHandler(localRpc: CordaRPCOps): ServletContextHandler {
return ServletContextHandler().apply {
contextPath = "/"
setAttribute("rpc", localRpc)
addServlet(DataUploadServlet::class.java, "/upload/*")
addServlet(AttachmentDownloadServlet::class.java, "/attachments/*")
val resourceConfig = ResourceConfig()
resourceConfig.register(ObjectMapperConfig(localRpc))
resourceConfig.register(ResponseFilter())
resourceConfig.register(APIServerImpl(localRpc))
val webAPIsOnClasspath = pluginRegistries.flatMap { x -> x.webApis }
for (webapi in webAPIsOnClasspath) {
log.info("Add plugin web API from attachment $webapi")
val customAPI = try {
webapi.apply(localRpc)
} catch (ex: InvocationTargetException) {
log.error("Constructor $webapi threw an error: ", ex.targetException)
continue
}
resourceConfig.register(customAPI)
}
val staticDirMaps = pluginRegistries.map { x -> x.staticServeDirs }
val staticDirs = staticDirMaps.flatMap { it.keys }.zip(staticDirMaps.flatMap { it.values })
staticDirs.forEach {
val staticDir = ServletHolder(DefaultServlet::class.java)
staticDir.setInitParameter("resourceBase", it.second)
staticDir.setInitParameter("dirAllowed", "true")
staticDir.setInitParameter("pathInfoOnly", "true")
addServlet(staticDir, "/web/${it.first}/*")
}
// Give the app a slightly better name in JMX rather than a randomly generated one and enable JMX
resourceConfig.addProperties(mapOf(ServerProperties.APPLICATION_NAME to "node.api",
ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED to "true"))
val container = ServletContainer(resourceConfig)
val jerseyServlet = ServletHolder(container)
addServlet(jerseyServlet, "/api/*")
jerseyServlet.initOrder = 0 // Initialise at server start
}
}
private fun retryConnectLocalRpc(): CordaRPCOps {
while (true) {
try {
return connectLocalRpcAsNodeUser()
} catch (e: ActiveMQNotConnectedException) {
log.debug("Could not connect to ${config.artemisAddress} due to exception: ", e)
Thread.sleep(retryDelay)
}
}
}
private fun connectLocalRpcAsNodeUser(): CordaRPCOps {
log.info("Connecting to node at ${config.artemisAddress} as node user")
val client = CordaRPCClient(config.artemisAddress, config)
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
return client.proxy()
}
/** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */
val pluginRegistries: List<CordaPluginRegistry> by lazy {
ServiceLoader.load(CordaPluginRegistry::class.java).toList()
}
}

View File

@ -0,0 +1,43 @@
package net.corda.node.webserver.api
import net.corda.core.node.NodeInfo
import java.time.LocalDateTime
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response
/**
* Top level interface to external interaction with the distributed ledger.
*
* Wherever a list is returned by a fetchXXX method that corresponds with an input list, that output list will have optional elements
* where a null indicates "missing" and the elements returned will be in the order corresponding with the input list.
*
*/
@Path("")
interface APIServer {
/**
* Report current UTC time as understood by the platform.
*/
@GET
@Path("servertime")
@Produces(MediaType.APPLICATION_JSON)
fun serverTime(): LocalDateTime
/**
* Report whether this node is started up or not.
*/
@GET
@Path("status")
@Produces(MediaType.TEXT_PLAIN)
fun status(): Response
/**
* Report this node's configuration and identities.
*/
@GET
@Path("info")
@Produces(MediaType.APPLICATION_JSON)
fun info(): NodeInfo
}

View File

@ -1,4 +1,4 @@
package net.corda.node.api
package net.corda.node.webserver.api
/**
* Extremely rudimentary query language which should most likely be replaced with a product.

View File

@ -0,0 +1,23 @@
package net.corda.node.webserver.internal
import net.corda.core.messaging.CordaRPCOps
import net.corda.node.webserver.api.*
import java.time.LocalDateTime
import java.time.ZoneId
import javax.ws.rs.core.Response
class APIServerImpl(val rpcOps: CordaRPCOps) : APIServer {
override fun serverTime(): LocalDateTime {
return LocalDateTime.ofInstant(rpcOps.currentNodeTime(), ZoneId.of("UTC"))
}
/**
* This endpoint is for polling if the webserver is serving. It will always return 200.
*/
override fun status(): Response {
return Response.ok("started").build()
}
override fun info() = rpcOps.nodeIdentity()
}

View File

@ -1,4 +1,4 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.StorageService

View File

@ -1,5 +1,6 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.Node
import net.corda.node.services.api.AcceptsFileUpload
@ -12,26 +13,19 @@ import javax.servlet.http.HttpServletResponse
/**
* Accepts binary streams, finds the right [AcceptsFileUpload] implementor and hands the stream off to it.
*/
class DataUploadServlet : HttpServlet() {
class DataUploadServlet: HttpServlet() {
private val log = loggerFor<DataUploadServlet>()
override fun doPost(req: HttpServletRequest, resp: HttpServletResponse) {
val node = servletContext.getAttribute("node") as Node
@Suppress("DEPRECATION") // Bogus warning due to superclass static method being deprecated.
val isMultipart = ServletFileUpload.isMultipartContent(req)
val rpc = servletContext.getAttribute("rpc") as CordaRPCOps
if (!isMultipart) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "This end point is for data uploads only.")
return
}
val acceptor: AcceptsFileUpload? = findAcceptor(node, req)
if (acceptor == null) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Got a file upload request for an unknown data type")
return
}
val upload = ServletFileUpload()
val iterator = upload.getItemIterator(req)
val messages = ArrayList<String>()
@ -43,18 +37,15 @@ class DataUploadServlet : HttpServlet() {
while (iterator.hasNext()) {
val item = iterator.next()
if (item.name != null && !acceptor.acceptableFileExtensions.any { item.name.endsWith(it) }) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
"${item.name}: Must be have a filename ending in one of: ${acceptor.acceptableFileExtensions}")
return
}
log.info("Receiving ${item.name}")
item.openStream().use {
val message = acceptor.upload(it)
log.info("${item.name} successfully accepted: $message")
messages += message
try {
val dataType = req.pathInfo.substring(1).substringBefore('/')
messages += rpc.uploadFile(dataType, item.name, item.openStream())
log.info("${item.name} successfully accepted: ${messages.last()}")
} catch(e: RuntimeException) {
println(e)
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Got a file upload request for an unknown data type")
}
}
@ -62,8 +53,4 @@ class DataUploadServlet : HttpServlet() {
val writer = resp.writer
messages.forEach { writer.println(it) }
}
private fun findAcceptor(node: Node, req: HttpServletRequest): AcceptsFileUpload? {
return node.servicesThatAcceptUploads.firstOrNull { req.pathInfo.substring(1).substringBefore('/') == it.dataTypePrefix }
}
}

View File

@ -1,6 +1,7 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import com.fasterxml.jackson.databind.ObjectMapper
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.ServiceHub
import net.corda.node.utilities.JsonSupport
import javax.ws.rs.ext.ContextResolver
@ -11,7 +12,7 @@ import javax.ws.rs.ext.Provider
* and to organise serializers / deserializers for java.time.* classes as necessary.
*/
@Provider
class Config(val services: ServiceHub) : ContextResolver<ObjectMapper> {
val defaultObjectMapper = JsonSupport.createDefaultMapper(services.identityService)
class ObjectMapperConfig(rpc: CordaRPCOps) : ContextResolver<ObjectMapper> {
val defaultObjectMapper = JsonSupport.createDefaultMapper(rpc)
override fun getContext(type: Class<*>) = defaultObjectMapper
}

View File

@ -1,4 +1,4 @@
package net.corda.node.servlets
package net.corda.node.webserver.servlets
import javax.ws.rs.container.ContainerRequestContext
import javax.ws.rs.container.ContainerResponseContext

View File

@ -17,7 +17,8 @@ class ArgsParserTest {
baseDirectory = workingDirectory,
configFile = workingDirectory / "node.conf",
help = false,
logToConsole = false))
logToConsole = false,
isWebserver = false))
}
@Test
@ -28,7 +29,8 @@ class ArgsParserTest {
baseDirectory = expectedBaseDir,
configFile = expectedBaseDir / "node.conf",
help = false,
logToConsole = false))
logToConsole = false,
isWebserver = false))
}
@Test
@ -39,7 +41,8 @@ class ArgsParserTest {
baseDirectory = baseDirectory,
configFile = baseDirectory / "node.conf",
help = false,
logToConsole = false))
logToConsole = false,
isWebserver = false))
}
@Test
@ -49,7 +52,8 @@ class ArgsParserTest {
baseDirectory = workingDirectory,
configFile = workingDirectory / "different.conf",
help = false,
logToConsole = false))
logToConsole = false,
isWebserver = false))
}
@Test
@ -60,7 +64,19 @@ class ArgsParserTest {
baseDirectory = workingDirectory,
configFile = configFile,
help = false,
logToConsole = false))
logToConsole = false,
isWebserver = false))
}
@Test
fun `just webserver `() {
val cmdLineOptions = parser.parse("--webserver")
assertThat(cmdLineOptions).isEqualTo(CmdLineOptions(
baseDirectory = workingDirectory,
configFile = workingDirectory / "node.conf",
help = false,
logToConsole = false,
isWebserver = true))
}
@Test

View File

@ -15,7 +15,7 @@ import kotlin.test.assertEquals
class JsonSupportTest {
companion object {
val mapper = JsonSupport.createDefaultMapper(MockIdentityService(mutableListOf()))
val mapper = JsonSupport.createNonRpcMapper()
}
@Property

View File

@ -1 +1 @@
gradlePluginsVersion=0.7
gradlePluginsVersion=0.7.1

View File

@ -118,4 +118,4 @@ private fun sslConfigFor(nodename: String, certsPath: String?): SSLConfiguration
override val trustStorePassword: String = "trustpass"
override val certificatesDirectory: Path = if (certsPath != null) Paths.get(certsPath) else Paths.get("build") / "nodes" / nodename / "certificates"
}
}
}

View File

@ -7,6 +7,7 @@ import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.node.driver.driver
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.utilities.getHostAndPort
import org.junit.Test
class BankOfCordaHttpAPITest {
@ -17,8 +18,8 @@ class BankOfCordaHttpAPITest {
startNode("BankOfCorda", setOf(ServiceInfo(SimpleNotaryService.type))),
startNode("BigCorporation")
).getOrThrow()
val nodeBankOfCordaApiAddr = nodeBankOfCorda.configuration.webAddress
val nodeBankOfCordaApiAddr = startWebserver(nodeBankOfCorda).getOrThrow()
assert(BankOfCordaClientApi(nodeBankOfCordaApiAddr).requestWebIssue(IssueRequestParams(1000, "USD", "BigCorporation", "1", "BankOfCorda")))
}, isDebug = true)
}
}
}

View File

@ -24,8 +24,8 @@ class BankOfCordaRPCClientTest {
driver(dsl = {
val user = User("user1", "test", permissions = setOf(startFlowPermission<IssuanceRequester>()))
val (nodeBankOfCorda, nodeBigCorporation) = Futures.allAsList(
startNode("BankOfCorda", setOf(ServiceInfo(SimpleNotaryService.type)), listOf(user)),
startNode("BigCorporation", rpcUsers = listOf(user))
startNode("BankOfCorda", setOf(ServiceInfo(SimpleNotaryService.type)), listOf(user)),
startNode("BigCorporation", rpcUsers = listOf(user))
).getOrThrow()
// Bank of Corda RPC Client
@ -86,4 +86,4 @@ class BankOfCordaRPCClientTest {
}
}, isDebug = true)
}
}
}

View File

@ -50,8 +50,9 @@ private class BankOfCordaDriver {
driver(dsl = {
val user = User("user1", "test", permissions = setOf(startFlowPermission<CashFlow>(), startFlowPermission<IssuerFlow.IssuanceRequester>()))
startNode("Notary", setOf(ServiceInfo(SimpleNotaryService.type)))
startNode("BankOfCorda", rpcUsers = listOf(user), advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("issuer.USD"))))
val bankOfCorda = startNode("BankOfCorda", rpcUsers = listOf(user), advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("issuer.USD"))))
startNode("BigCorporation", rpcUsers = listOf(user))
startWebserver(bankOfCorda.get())
waitForAllNodesToFinish()
}, isDebug = true)
}
@ -75,7 +76,8 @@ private class BankOfCordaDriver {
}
}
catch (e: Exception) {
printHelp(parser)
println("Exception occurred: $e \n ${e.printStackTrace()}")
exitProcess(1)
}
}
}

View File

@ -35,14 +35,20 @@ class IRSDemoTest : IntegrationTestCategory {
startNode("Bank B")
).getOrThrow()
val (controllerAddr, nodeAAddr, nodeBAddr) = Futures.allAsList(
startWebserver(controller),
startWebserver(nodeA),
startWebserver(nodeB)
).getOrThrow()
val nextFixingDates = getFixingDateObservable(nodeA.configuration)
runUploadRates(controller.configuration.webAddress)
runTrade(nodeA.configuration.webAddress)
runUploadRates(controllerAddr)
runTrade(nodeAAddr)
// Wait until the initial trade and all scheduled fixings up to the current date have finished
nextFixingDates.first { it == null || it > currentDate }
runDateChange(nodeB.configuration.webAddress)
runDateChange(nodeBAddr)
nextFixingDates.first { it == null || it > futureDate }
}
}
@ -78,4 +84,4 @@ class IRSDemoTest : IntegrationTestCategory {
val url = URL("http://$host/upload/interest-rates")
assert(uploadFile(url, fileContents))
}
}
}

View File

@ -1,5 +1,7 @@
package net.corda.irs
import com.google.common.util.concurrent.Futures
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.irs.api.NodeInterestRates
import net.corda.node.driver.driver
@ -11,9 +13,16 @@ import net.corda.node.services.transactions.SimpleNotaryService
*/
fun main(args: Array<String>) {
driver(dsl = {
startNode("Notary", setOf(ServiceInfo(SimpleNotaryService.type), ServiceInfo(NodeInterestRates.type))).get()
startNode("Bank A")
startNode("Bank B")
val (controller, nodeA, nodeB) = Futures.allAsList(
startNode("Notary", setOf(ServiceInfo(SimpleNotaryService.type), ServiceInfo(NodeInterestRates.type))),
startNode("Bank A"),
startNode("Bank B")
).getOrThrow()
startWebserver(controller)
startWebserver(nodeA)
startWebserver(nodeB)
waitForAllNodesToFinish()
}, useTestClock = true, isDebug = true)
}

View File

@ -32,7 +32,7 @@ import java.util.*
* A simulation in which banks execute interest rate swaps with each other, including the fixing events.
*/
class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(networkSendManuallyPumped, runAsync, latencyInjector) {
val om = net.corda.node.utilities.JsonSupport.createDefaultMapper(MockIdentityService(network.identities))
val om = net.corda.node.utilities.JsonSupport.createInMemoryMapper(MockIdentityService(network.identities))
init {
currentDateAndTime = LocalDate.of(2016, 3, 8).atStartOfDay()

View File

@ -24,7 +24,6 @@ fun main(args: Array<String>) {
/** Interface for using the notary demo API from a client. */
private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
private val notary by lazy {
rpc.networkMapUpdates().first.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
}
@ -37,17 +36,21 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
private val TRANSACTION_COUNT = 10
}
/** Makes a call to the demo api to start transaction notarisation. */
/** Makes calls to the node rpc to start transaction notarisation. */
fun startNotarisation() {
val response = notarise(TRANSACTION_COUNT)
println(response)
notarise(TRANSACTION_COUNT)
}
fun notarise(count: Int): String {
fun notarise(count: Int) {
val transactions = buildTransactions(count)
val signers = notariseTransactions(transactions)
val transactionSigners = transactions.zip(signers).map {
val (tx, signer) = it
"Tx [${tx.tx.id.prefixChars()}..] signed by $signer"
}.joinToString("\n")
return buildResponse(transactions, signers)
println("Notary: \"${notary.name}\", with composite key: ${notary.owningKey}\n" +
"Notarised ${transactions.size} transactions:\n" + transactionSigners)
}
/**
@ -72,20 +75,7 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
val signatureFutures = transactions.map {
rpc.startFlow(NotaryFlow::Client, it).returnValue.toBlocking().toFuture()
}
val signers = signatureFutures.map { it.get().by.toStringShort() }
return signers
}
/** Builds a response for the caller containing the list of transaction ids and corresponding signer keys. */
private fun buildResponse(transactions: List<SignedTransaction>, signers: List<String>): String {
val transactionSigners = transactions.zip(signers).map {
val (tx, signer) = it
"Tx [${tx.tx.id.prefixChars()}..] signed by $signer"
}.joinToString("\n")
val response = "Notary: \"${notary.name}\", with composite key: ${notary.owningKey}\n" +
"Notarised ${transactions.size} transactions:\n" + transactionSigners
return response
return signatureFutures.map { it.get().by.toStringShort() }
}
}

View File

@ -1,9 +1,9 @@
package net.corda.vega
import com.google.common.util.concurrent.Futures
import com.opengamma.strata.product.common.BuySell
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.node.driver.NodeHandle
import net.corda.node.driver.driver
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.testing.IntegrationTestCategory
@ -12,14 +12,11 @@ import net.corda.vega.api.PortfolioApi
import net.corda.vega.api.PortfolioApiUtils
import net.corda.vega.api.SwapDataModel
import net.corda.vega.api.SwapDataView
import net.corda.vega.portfolio.Portfolio
import org.junit.Test
import java.math.BigDecimal
import java.time.LocalDate
import java.util.*
import java.util.concurrent.Future
class SimmValuationTest: IntegrationTestCategory {
class SimmValuationTest : IntegrationTestCategory {
private companion object {
// SIMM demo can only currently handle one valuation date due to a lack of market data or a market data source.
val valuationDate = LocalDate.parse("2016-06-06")
@ -31,23 +28,20 @@ class SimmValuationTest: IntegrationTestCategory {
@Test fun `runs SIMM valuation demo`() {
driver(isDebug = true) {
startNode("Controller", setOf(ServiceInfo(SimpleNotaryService.type))).getOrThrow()
val nodeA = getSimmNodeApi(startNode(nodeALegalName))
val nodeB = getSimmNodeApi(startNode(nodeBLegalName))
val nodeBParty = getPartyWithName(nodeA, nodeBLegalName)
val nodeAParty = getPartyWithName(nodeB, nodeALegalName)
val (nodeA, nodeB) = Futures.allAsList(startNode(nodeALegalName), startNode(nodeBLegalName)).getOrThrow()
val (nodeAApi, nodeBApi) = Futures.allAsList(startWebserver(nodeA), startWebserver(nodeB))
.getOrThrow()
.map { HttpApi.fromHostAndPort(it, "api/simmvaluationdemo") }
val nodeBParty = getPartyWithName(nodeAApi, nodeBLegalName)
val nodeAParty = getPartyWithName(nodeBApi, nodeALegalName)
assert(createTradeBetween(nodeA, nodeBParty, testTradeId))
assert(tradeExists(nodeB, nodeAParty, testTradeId))
assert(runValuationsBetween(nodeA, nodeBParty))
assert(valuationExists(nodeB, nodeAParty))
assert(createTradeBetween(nodeAApi, nodeBParty, testTradeId))
assert(tradeExists(nodeBApi, nodeAParty, testTradeId))
assert(runValuationsBetween(nodeAApi, nodeBParty))
assert(valuationExists(nodeBApi, nodeAParty))
}
}
private fun getSimmNodeApi(futureNode: Future<NodeHandle>): HttpApi {
val nodeAddr = futureNode.getOrThrow().configuration.webAddress
return HttpApi.fromHostAndPort(nodeAddr, "api/simmvaluationdemo")
}
private fun getPartyWithName(partyApi: HttpApi, countryparty: String): PortfolioApi.ApiParty =
getAvailablePartiesFor(partyApi).counterparties.single { it.text == countryparty }

View File

@ -1,5 +1,7 @@
package net.corda.vega
import com.google.common.util.concurrent.Futures
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.node.driver.driver
import net.corda.node.services.transactions.SimpleNotaryService
@ -12,9 +14,16 @@ import net.corda.node.services.transactions.SimpleNotaryService
fun main(args: Array<String>) {
driver(dsl = {
startNode("Controller", setOf(ServiceInfo(SimpleNotaryService.type)))
startNode("Bank A")
startNode("Bank B")
startNode("Bank C")
val (nodeA, nodeB, nodeC) = Futures.allAsList(
startNode("Bank A"),
startNode("Bank B"),
startNode("Bank C")
).getOrThrow()
startWebserver(nodeA)
startWebserver(nodeB)
startWebserver(nodeC)
waitForAllNodesToFinish()
}, isDebug = true)
}

View File

@ -19,10 +19,10 @@ class TraderDemoTest {
val demoUser = listOf(User("demo", "demo", permissions))
val user = User("user1", "test", permissions = setOf(startFlowPermission<IssuerFlow.IssuanceRequester>()))
val (nodeA, nodeB) = Futures.allAsList(
startNode("Bank A", rpcUsers = demoUser),
startNode("Bank B", rpcUsers = demoUser),
startNode("BankOfCorda", rpcUsers = listOf(user)),
startNode("Notary", setOf(ServiceInfo(SimpleNotaryService.type)))
startNode("Bank A", rpcUsers = demoUser),
startNode("Bank B", rpcUsers = demoUser),
startNode("BankOfCorda", rpcUsers = listOf(user)),
startNode("Notary", setOf(ServiceInfo(SimpleNotaryService.type)))
).getOrThrow()
val (nodeARpc, nodeBRpc) = listOf(nodeA, nodeB)
.map { it.rpcClientToNode().start(demoUser[0].username, demoUser[0].password).proxy() }

View File

@ -4,6 +4,7 @@ package net.corda.testing
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import com.typesafe.config.Config
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.*
import net.corda.core.flows.FlowLogic
@ -155,3 +156,5 @@ data class TestNodeConfiguration(
override val emailAddress: String = "",
override val exportJMXto: String = "",
override val devMode: Boolean = true) : NodeConfiguration
fun Config.getHostAndPort(name: String) = HostAndPort.fromString(getString(name))

View File

@ -155,6 +155,7 @@ open class MockTransactionStorage : TransactionStorage {
@ThreadSafe
class MockStorageService(override val attachments: AttachmentStorage = MockAttachmentStorage(),
override val validatedTransactions: TransactionStorage = MockTransactionStorage(),
override val uploaders: List<FileUploader> = listOf<FileUploader>(),
override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage = MockStateMachineRecordedTransactionMappingStorage())
: SingletonSerializeAsToken(), TxWritableStorageService