[CORDA-1156]: Output of run networkMapFeed and run networkMapSnapshot not consistent in shell (fixed) (#2733)

This commit is contained in:
Michele Sollecito 2018-03-06 10:15:17 +00:00 committed by GitHub
parent 3a247f2966
commit 6479d7d8ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 20 deletions

View File

@ -40,6 +40,7 @@ buildscript {
ext.jackson_version = '2.9.3'
ext.jetty_version = '9.4.7.v20170914'
ext.jersey_version = '2.25'
ext.json_version = '20180130'
ext.assertj_version = '3.8.0'
ext.slf4j_version = '1.7.25'
ext.log4j_version = '2.9.1'

View File

@ -104,6 +104,7 @@ dependencies {
// Jackson support: serialisation to/from JSON, YAML, etc
compile project(':client:jackson')
compile group: 'org.json', name: 'json', version: json_version
// Coda Hale's Metrics: for monitoring of key statistics
compile "io.dropwizard.metrics:metrics-core:3.1.2"

View File

@ -1,6 +1,5 @@
package net.corda.node.shell
import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.*
@ -15,6 +14,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
@ -22,7 +22,9 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.internal.security.AdminSubject
@ -52,6 +54,7 @@ import org.crsh.util.Utils
import org.crsh.vfs.FS
import org.crsh.vfs.spi.file.FileMountFactory
import org.crsh.vfs.spi.url.ClassPathMountFactory
import org.json.JSONObject
import org.slf4j.LoggerFactory
import rx.Observable
import rx.Subscriber
@ -196,13 +199,35 @@ object InteractiveShell {
}
}
private fun createOutputMapper(factory: JsonFactory): ObjectMapper {
return JacksonSupport.createNonRpcMapper(factory).apply {
private object NodeInfoSerializer : JsonSerializer<NodeInfo>() {
override fun serialize(nodeInfo: NodeInfo, gen: JsonGenerator, serializers: SerializerProvider) {
val json = JSONObject()
json["addresses"] = nodeInfo.addresses.map { address -> address.serialise() }
json["legalIdentities"] = nodeInfo.legalIdentities.map { address -> address.serialise() }
json["platformVersion"] = nodeInfo.platformVersion
json["serial"] = nodeInfo.serial
gen.writeRaw(json.toString())
}
private fun NetworkHostAndPort.serialise() = this.toString()
private fun Party.serialise() = JSONObject().put("name", this.name)
private operator fun JSONObject.set(key: String, value: Any?): JSONObject {
return put(key, value)
}
}
private fun createOutputMapper(): ObjectMapper {
return JacksonSupport.createNonRpcMapper().apply {
// Register serializers for stateful objects from libraries that are special to the RPC system and don't
// make sense to print out to the screen. For classes we own, annotations can be used instead.
val rpcModule = SimpleModule()
rpcModule.addSerializer(Observable::class.java, ObservableSerializer)
rpcModule.addSerializer(InputStream::class.java, InputStreamSerializer)
rpcModule.addSerializer(NodeInfo::class.java, NodeInfoSerializer)
registerModule(rpcModule)
disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
@ -211,7 +236,7 @@ object InteractiveShell {
}
// TODO: This should become the default renderer rather than something used specifically by commands.
private val yamlMapper by lazy { createOutputMapper(YAMLFactory()) }
private val outputMapper by lazy { createOutputMapper() }
/**
* Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out
@ -394,11 +419,19 @@ object InteractiveShell {
return result
}
private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture<Unit> {
val printerFun = yamlMapper::writeValueAsString
toStream.println(printerFun(response))
toStream.flush()
return maybeFollow(response, printerFun, toStream)
private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter): CordaFuture<Unit> {
val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) }
val mappingFunction: (Any?) -> String = { value ->
if (value is Collection<*>) {
value.joinToString(",${System.lineSeparator()} ", "[${System.lineSeparator()} ", "${System.lineSeparator()}]") { element ->
mapElement(element)
}
} else {
mapElement(value)
}
}
return maybeFollow(response, mappingFunction, out)
}
private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() {
@ -421,6 +454,7 @@ object InteractiveShell {
override fun onNext(t: Any?) {
count++
toStream.println("Observation $count: " + printerFun(t))
toStream.flush()
}
@Synchronized
@ -431,25 +465,32 @@ object InteractiveShell {
}
}
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): CordaFuture<Unit> {
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> {
// Match on a couple of common patterns for "important" observables. It's tough to do this in a generic
// way because observables can be embedded anywhere in the object graph, and can emit other arbitrary
// object graphs that contain yet more observables. So we just look for top level responses that follow
// the standard "track" pattern, and print them until the user presses Ctrl-C
if (response == null) return doneFuture(Unit)
val observable: Observable<*> = when (response) {
is Observable<*> -> response
is DataFeed<*, *> -> {
toStream.println("Snapshot")
toStream.println(response.snapshot)
response.updates
}
else -> return doneFuture(Unit)
if (response is DataFeed<*, *>) {
out.println("Snapshot:")
out.println(printerFun(response.snapshot))
out.flush()
out.println("Updates:")
return printNextElements(response.updates, printerFun, out)
}
if (response is Observable<*>) {
return printNextElements(response, printerFun, out)
}
val subscriber = PrintingSubscriber(printerFun, toStream)
uncheckedCast(observable).subscribe(subscriber)
out.println(printerFun(response))
return doneFuture(Unit)
}
private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> {
val subscriber = PrintingSubscriber(printerFun, out)
uncheckedCast(elements).subscribe(subscriber)
return subscriber.future
}