mirror of
https://github.com/corda/corda.git
synced 2025-03-12 07:23:59 +00:00
CORDA-1156 - Fixed displayed nodeInfo and stateMachineSnapshot in InteractiveShell. (#2732)
This commit is contained in:
parent
d6f9bbfe2e
commit
194a690ef4
@ -39,6 +39,7 @@ buildscript {
|
|||||||
ext.jetty_version = '9.4.7.v20170914'
|
ext.jetty_version = '9.4.7.v20170914'
|
||||||
ext.jersey_version = '2.25'
|
ext.jersey_version = '2.25'
|
||||||
ext.jolokia_version = '1.3.7'
|
ext.jolokia_version = '1.3.7'
|
||||||
|
ext.json_version = '20180130'
|
||||||
ext.assertj_version = '3.8.0'
|
ext.assertj_version = '3.8.0'
|
||||||
ext.slf4j_version = '1.7.25'
|
ext.slf4j_version = '1.7.25'
|
||||||
ext.log4j_version = '2.9.1'
|
ext.log4j_version = '2.9.1'
|
||||||
|
@ -96,6 +96,7 @@ dependencies {
|
|||||||
|
|
||||||
// Jackson support: serialisation to/from JSON, YAML, etc
|
// Jackson support: serialisation to/from JSON, YAML, etc
|
||||||
compile project(':client:jackson')
|
compile project(':client:jackson')
|
||||||
|
compile group: 'org.json', name: 'json', version: json_version
|
||||||
|
|
||||||
// Coda Hale's Metrics: for monitoring of key statistics
|
// Coda Hale's Metrics: for monitoring of key statistics
|
||||||
compile "io.dropwizard.metrics:metrics-core:3.1.2"
|
compile "io.dropwizard.metrics:metrics-core:3.1.2"
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
package net.corda.node.shell
|
package net.corda.node.shell
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonFactory
|
|
||||||
import com.fasterxml.jackson.core.JsonGenerator
|
import com.fasterxml.jackson.core.JsonGenerator
|
||||||
import com.fasterxml.jackson.core.JsonParser
|
import com.fasterxml.jackson.core.JsonParser
|
||||||
import com.fasterxml.jackson.databind.*
|
import com.fasterxml.jackson.databind.*
|
||||||
@ -15,6 +14,7 @@ import net.corda.core.concurrent.CordaFuture
|
|||||||
import net.corda.core.contracts.UniqueIdentifier
|
import net.corda.core.contracts.UniqueIdentifier
|
||||||
import net.corda.core.flows.FlowLogic
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.internal.*
|
import net.corda.core.internal.*
|
||||||
import net.corda.core.internal.concurrent.doneFuture
|
import net.corda.core.internal.concurrent.doneFuture
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
@ -22,7 +22,9 @@ import net.corda.core.messaging.CordaRPCOps
|
|||||||
import net.corda.core.messaging.DataFeed
|
import net.corda.core.messaging.DataFeed
|
||||||
import net.corda.core.messaging.FlowProgressHandle
|
import net.corda.core.messaging.FlowProgressHandle
|
||||||
import net.corda.core.messaging.StateMachineUpdate
|
import net.corda.core.messaging.StateMachineUpdate
|
||||||
|
import net.corda.core.node.NodeInfo
|
||||||
import net.corda.core.node.services.IdentityService
|
import net.corda.core.node.services.IdentityService
|
||||||
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.internal.Node
|
import net.corda.node.internal.Node
|
||||||
import net.corda.node.internal.StartedNode
|
import net.corda.node.internal.StartedNode
|
||||||
import net.corda.node.internal.security.AdminSubject
|
import net.corda.node.internal.security.AdminSubject
|
||||||
@ -52,6 +54,7 @@ import org.crsh.util.Utils
|
|||||||
import org.crsh.vfs.FS
|
import org.crsh.vfs.FS
|
||||||
import org.crsh.vfs.spi.file.FileMountFactory
|
import org.crsh.vfs.spi.file.FileMountFactory
|
||||||
import org.crsh.vfs.spi.url.ClassPathMountFactory
|
import org.crsh.vfs.spi.url.ClassPathMountFactory
|
||||||
|
import org.json.JSONObject
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.Subscriber
|
import rx.Subscriber
|
||||||
@ -196,13 +199,35 @@ object InteractiveShell {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun createOutputMapper(factory: JsonFactory): ObjectMapper {
|
private object NodeInfoSerializer : JsonSerializer<NodeInfo>() {
|
||||||
return JacksonSupport.createNonRpcMapper(factory).apply {
|
|
||||||
|
override fun serialize(nodeInfo: NodeInfo, gen: JsonGenerator, serializers: SerializerProvider) {
|
||||||
|
|
||||||
|
val json = JSONObject()
|
||||||
|
json["addresses"] = nodeInfo.addresses.map { address -> address.serialise() }
|
||||||
|
json["legalIdentities"] = nodeInfo.legalIdentities.map { address -> address.serialise() }
|
||||||
|
json["platformVersion"] = nodeInfo.platformVersion
|
||||||
|
json["serial"] = nodeInfo.serial
|
||||||
|
gen.writeRaw(json.toString())
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun NetworkHostAndPort.serialise() = this.toString()
|
||||||
|
private fun Party.serialise() = JSONObject().put("name", this.name)
|
||||||
|
|
||||||
|
private operator fun JSONObject.set(key: String, value: Any?): JSONObject {
|
||||||
|
return put(key, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun createOutputMapper(): ObjectMapper {
|
||||||
|
|
||||||
|
return JacksonSupport.createNonRpcMapper().apply {
|
||||||
// Register serializers for stateful objects from libraries that are special to the RPC system and don't
|
// Register serializers for stateful objects from libraries that are special to the RPC system and don't
|
||||||
// make sense to print out to the screen. For classes we own, annotations can be used instead.
|
// make sense to print out to the screen. For classes we own, annotations can be used instead.
|
||||||
val rpcModule = SimpleModule()
|
val rpcModule = SimpleModule()
|
||||||
rpcModule.addSerializer(Observable::class.java, ObservableSerializer)
|
rpcModule.addSerializer(Observable::class.java, ObservableSerializer)
|
||||||
rpcModule.addSerializer(InputStream::class.java, InputStreamSerializer)
|
rpcModule.addSerializer(InputStream::class.java, InputStreamSerializer)
|
||||||
|
rpcModule.addSerializer(NodeInfo::class.java, NodeInfoSerializer)
|
||||||
registerModule(rpcModule)
|
registerModule(rpcModule)
|
||||||
|
|
||||||
disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
|
disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
|
||||||
@ -211,7 +236,7 @@ object InteractiveShell {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: This should become the default renderer rather than something used specifically by commands.
|
// TODO: This should become the default renderer rather than something used specifically by commands.
|
||||||
private val yamlMapper by lazy { createOutputMapper(YAMLFactory()) }
|
private val outputMapper by lazy { createOutputMapper() }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out
|
* Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out
|
||||||
@ -394,11 +419,19 @@ object InteractiveShell {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture<Unit> {
|
private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter): CordaFuture<Unit> {
|
||||||
val printerFun = yamlMapper::writeValueAsString
|
|
||||||
toStream.println(printerFun(response))
|
val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) }
|
||||||
toStream.flush()
|
val mappingFunction: (Any?) -> String = { value ->
|
||||||
return maybeFollow(response, printerFun, toStream)
|
if (value is Collection<*>) {
|
||||||
|
value.joinToString(",${System.lineSeparator()} ", "[${System.lineSeparator()} ", "${System.lineSeparator()}]") { element ->
|
||||||
|
mapElement(element)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
mapElement(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return maybeFollow(response, mappingFunction, out)
|
||||||
}
|
}
|
||||||
|
|
||||||
private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() {
|
private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() {
|
||||||
@ -421,6 +454,7 @@ object InteractiveShell {
|
|||||||
override fun onNext(t: Any?) {
|
override fun onNext(t: Any?) {
|
||||||
count++
|
count++
|
||||||
toStream.println("Observation $count: " + printerFun(t))
|
toStream.println("Observation $count: " + printerFun(t))
|
||||||
|
toStream.flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
@Synchronized
|
@Synchronized
|
||||||
@ -431,25 +465,32 @@ object InteractiveShell {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): CordaFuture<Unit> {
|
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> {
|
||||||
// Match on a couple of common patterns for "important" observables. It's tough to do this in a generic
|
// Match on a couple of common patterns for "important" observables. It's tough to do this in a generic
|
||||||
// way because observables can be embedded anywhere in the object graph, and can emit other arbitrary
|
// way because observables can be embedded anywhere in the object graph, and can emit other arbitrary
|
||||||
// object graphs that contain yet more observables. So we just look for top level responses that follow
|
// object graphs that contain yet more observables. So we just look for top level responses that follow
|
||||||
// the standard "track" pattern, and print them until the user presses Ctrl-C
|
// the standard "track" pattern, and print them until the user presses Ctrl-C
|
||||||
if (response == null) return doneFuture(Unit)
|
if (response == null) return doneFuture(Unit)
|
||||||
|
|
||||||
val observable: Observable<*> = when (response) {
|
if (response !is Observable<*> && response !is DataFeed<*, *>) {
|
||||||
is Observable<*> -> response
|
out.println(printerFun(response))
|
||||||
is DataFeed<*, *> -> {
|
return doneFuture(Unit)
|
||||||
toStream.println("Snapshot")
|
|
||||||
toStream.println(response.snapshot)
|
|
||||||
response.updates
|
|
||||||
}
|
|
||||||
else -> return doneFuture(Unit)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val subscriber = PrintingSubscriber(printerFun, toStream)
|
if (response is DataFeed<*, *>) {
|
||||||
uncheckedCast(observable).subscribe(subscriber)
|
out.println("Snapshot:")
|
||||||
|
out.println(printerFun(response.snapshot))
|
||||||
|
out.flush()
|
||||||
|
out.println("Updates:")
|
||||||
|
return printNextElements(response.updates, printerFun, out)
|
||||||
|
}
|
||||||
|
return printNextElements(response as Observable<*>, printerFun, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> {
|
||||||
|
|
||||||
|
val subscriber = PrintingSubscriber(printerFun, out)
|
||||||
|
uncheckedCast(elements).subscribe(subscriber)
|
||||||
return subscriber.future
|
return subscriber.future
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user