Fixed the node shell to work with the DataFeed class

This commit is contained in:
Shams Asari
2017-10-13 17:28:05 +01:00
parent 6a00df322b
commit 3372e49898

View File

@ -7,23 +7,20 @@ import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.google.common.io.Closeables import com.google.common.io.Closeables
import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.StringToMethodCallParser
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.*
import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.write
import net.corda.core.internal.*
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineUpdate import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.StringToMethodCallParser
import net.corda.core.CordaException
import net.corda.node.internal.Node import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode import net.corda.node.internal.StartedNode
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
@ -200,7 +197,7 @@ object InteractiveShell {
} }
private fun createOutputMapper(factory: JsonFactory): ObjectMapper { private fun createOutputMapper(factory: JsonFactory): ObjectMapper {
return JacksonSupport.createNonRpcMapper(factory).apply({ return JacksonSupport.createNonRpcMapper(factory).apply {
// Register serializers for stateful objects from libraries that are special to the RPC system and don't // Register serializers for stateful objects from libraries that are special to the RPC system and don't
// make sense to print out to the screen. For classes we own, annotations can be used instead. // make sense to print out to the screen. For classes we own, annotations can be used instead.
val rpcModule = SimpleModule() val rpcModule = SimpleModule()
@ -210,7 +207,7 @@ object InteractiveShell {
disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
enable(SerializationFeature.INDENT_OUTPUT) enable(SerializationFeature.INDENT_OUTPUT)
}) }
} }
// TODO: This should become the default renderer rather than something used specifically by commands. // TODO: This should become the default renderer rather than something used specifically by commands.
@ -397,7 +394,7 @@ object InteractiveShell {
} }
private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture<Unit>? { private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture<Unit>? {
val printerFun = { obj: Any? -> yamlMapper.writeValueAsString(obj) } val printerFun = yamlMapper::writeValueAsString
toStream.println(printerFun(response)) toStream.println(printerFun(response))
toStream.flush() toStream.flush()
return maybeFollow(response, printerFun, toStream) return maybeFollow(response, printerFun, toStream)
@ -443,13 +440,9 @@ object InteractiveShell {
val observable: Observable<*> = when (response) { val observable: Observable<*> = when (response) {
is Observable<*> -> response is Observable<*> -> response
is Pair<*, *> -> when { is DataFeed<*, *> -> response.updates
response.first is Observable<*> -> response.first as Observable<*> else -> return null
response.second is Observable<*> -> response.second as Observable<*>
else -> null
} }
else -> null
} ?: return null
val subscriber = PrintingSubscriber(printerFun, toStream) val subscriber = PrintingSubscriber(printerFun, toStream)
uncheckedCast(observable).subscribe(subscriber) uncheckedCast(observable).subscribe(subscriber)
@ -500,8 +493,8 @@ object InteractiveShell {
gen.writeString("<not saved>") gen.writeString("<not saved>")
} else { } else {
val path = Paths.get(toPath) val path = Paths.get(toPath)
path.write { value.copyTo(it) } value.copyTo(path)
gen.writeString("<saved to: $path>") gen.writeString("<saved to: ${path.toAbsolutePath()}>")
} }
} finally { } finally {
try { try {