Shell: add support for uploading and downloading attachments.

This commit is contained in:
Mike Hearn
2017-03-21 12:13:37 +01:00
parent 347224c900
commit ac90fe724e
10 changed files with 630 additions and 148 deletions

View File

@ -2,16 +2,21 @@ package net.corda.node
import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.google.common.io.Closeables
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.div
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.rootCause
import net.corda.core.then
import net.corda.core.utilities.Emoji
import net.corda.core.write
import net.corda.jackson.JacksonSupport
import net.corda.jackson.StringToMethodCallParser
import net.corda.node.internal.Node
@ -35,16 +40,16 @@ import org.crsh.vfs.spi.file.FileMountFactory
import org.crsh.vfs.spi.url.ClassPathMountFactory
import rx.Observable
import rx.Subscriber
import java.io.FileDescriptor
import java.io.FileInputStream
import java.io.PrintWriter
import java.io.*
import java.lang.reflect.Constructor
import java.lang.reflect.InvocationTargetException
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.logging.Level
import java.util.logging.Logger
import kotlin.concurrent.thread
@ -52,7 +57,6 @@ import kotlin.concurrent.thread
// TODO: Add command history.
// TODO: Command completion.
// TODO: Find a way to inject this directly into CRaSH as a command, without needing JIT source compilation.
// TODO: Add serialisers for InputStream so attachments can be uploaded through the shell.
// TODO: Do something sensible with commands that return a future.
// TODO: Configure default renderers, send objects down the pipeline, add commands to do json/xml/yaml outputs.
// TODO: Add a command to view last N lines/tail/control log4j2 loggers.
@ -76,7 +80,7 @@ object InteractiveShell {
Logger.getLogger("").level = Level.OFF // TODO: Is this really needed?
val classpathDriver = ClassPathMountFactory(Thread.currentThread().contextClassLoader)
val fileDriver = FileMountFactory(Utils.getCurrentDirectory());
val fileDriver = FileMountFactory(Utils.getCurrentDirectory())
val extraCommandsPath = (dir / "shell-commands").toAbsolutePath()
Files.createDirectories(extraCommandsPath)
@ -158,12 +162,10 @@ object InteractiveShell {
private val yamlInputMapper: ObjectMapper by lazy {
// Return a standard Corda Jackson object mapper, configured to use YAML by default and with extra
// serializers.
JacksonSupport.createInMemoryMapper(node.services.identityService, YAMLFactory())
}
private object ObservableSerializer : JsonSerializer<Observable<*>>() {
override fun serialize(value: Observable<*>, gen: JsonGenerator, serializers: SerializerProvider) {
gen.writeString("(observable)")
JacksonSupport.createInMemoryMapper(node.services.identityService, YAMLFactory()).apply {
val rpcModule = SimpleModule()
rpcModule.addDeserializer(InputStream::class.java, InputStreamDeserializer)
registerModule(rpcModule)
}
}
@ -171,8 +173,9 @@ object InteractiveShell {
return JacksonSupport.createNonRpcMapper(factory).apply({
// Register serializers for stateful objects from libraries that are special to the RPC system and don't
// make sense to print out to the screen. For classes we own, annotations can be used instead.
val rpcModule = SimpleModule("RPC module")
val rpcModule = SimpleModule()
rpcModule.addSerializer(Observable::class.java, ObservableSerializer)
rpcModule.addSerializer(InputStream::class.java, InputStreamSerializer)
registerModule(rpcModule)
disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
@ -222,6 +225,8 @@ object InteractiveShell {
} catch(e: NoApplicableConstructor) {
output.println("No matching constructor found:", Color.red)
e.errors.forEach { output.println("- $it", Color.red) }
} finally {
InputStreamDeserializer.closeAll()
}
}
@ -283,7 +288,54 @@ object InteractiveShell {
}
@JvmStatic
fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CompletableFuture<Unit>? {
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>): Any? {
val parser = StringToMethodCallParser(CordaRPCOps::class.java, context.attributes["mapper"] as ObjectMapper)
val cmd = input.joinToString(" ").trim({ it <= ' ' })
if (cmd.toLowerCase().startsWith("startflow")) {
// The flow command provides better support and startFlow requires special handling anyway due to
// the generic startFlow RPC interface which offers no type information with which to parse the
// string form of the command.
out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow)
return null
}
var result: Any? = null
try {
InputStreamSerializer.invokeContext = context
val call = parser.parse(context.attributes["ops"] as CordaRPCOps, cmd)
result = call.call()
if (result != null && result !is kotlin.Unit && result !is Void) {
result = printAndFollowRPCResponse(result, out)
}
if (result is Future<*>) {
if (!result.isDone) {
out.println("Waiting for completion or Ctrl-C ... ")
out.flush()
}
try {
result = result.get()
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
} catch (e: ExecutionException) {
throw RuntimeException(e.rootCause)
} catch (e: InvocationTargetException) {
throw RuntimeException(e.rootCause)
}
}
} catch (e: StringToMethodCallParser.UnparseableCallException) {
out.println(e.message, Color.red)
out.println("Please try 'man run' to learn what syntax is acceptable")
} catch (e: Exception) {
out.println("RPC failed: ${e.rootCause}", Color.red)
} finally {
InputStreamSerializer.invokeContext = null
InputStreamDeserializer.closeAll()
}
return result
}
private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): ListenableFuture<Unit>? {
val printerFun = { obj: Any? -> yamlMapper.writeValueAsString(obj) }
toStream.println(printerFun(response))
toStream.flush()
@ -291,13 +343,13 @@ object InteractiveShell {
}
private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() {
private var count = 0;
val future = CompletableFuture<Unit>()
private var count = 0
val future = SettableFuture.create<Unit>()!!
init {
// The future is public and can be completed by something else to indicate we don't wish to follow
// anymore (e.g. the user pressing Ctrl-C).
future.thenAccept {
future then {
if (!isUnsubscribed)
unsubscribe()
}
@ -306,7 +358,7 @@ object InteractiveShell {
@Synchronized
override fun onCompleted() {
toStream.println("Observable has completed")
future.complete(Unit)
future.set(Unit)
}
@Synchronized
@ -320,13 +372,13 @@ object InteractiveShell {
override fun onError(e: Throwable) {
toStream.println("Observable completed with an error")
e.printStackTrace()
future.completeExceptionally(e)
future.setException(e)
}
}
// Kotlin bug: USELESS_CAST warning is generated below but the IDE won't let us remove it.
@Suppress("USELESS_CAST", "UNCHECKED_CAST")
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): CompletableFuture<Unit>? {
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): SettableFuture<Unit>? {
// Match on a couple of common patterns for "important" observables. It's tough to do this in a generic
// way because observables can be embedded anywhere in the object graph, and can emit other arbitrary
// object graphs that contain yet more observables. So we just look for top level responses that follow
@ -347,4 +399,63 @@ object InteractiveShell {
(observable as Observable<Any>).subscribe(subscriber)
return subscriber.future
}
//region Extra serializers
//
// These serializers are used to enable the user to specify objects that aren't natural data containers in the shell,
// and for the shell to print things out that otherwise wouldn't be usefully printable.
private object ObservableSerializer : JsonSerializer<Observable<*>>() {
override fun serialize(value: Observable<*>, gen: JsonGenerator, serializers: SerializerProvider) {
gen.writeString("(observable)")
}
}
// A file name is deserialized to an InputStream if found.
object InputStreamDeserializer : JsonDeserializer<InputStream>() {
// Keep track of them so we can close them later.
private val streams = Collections.synchronizedSet(HashSet<InputStream>())
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): InputStream {
val stream = object : FilterInputStream(BufferedInputStream(Files.newInputStream(Paths.get(p.text)))) {
override fun close() {
super.close()
streams.remove(this)
}
}
streams += stream
return stream
}
fun closeAll() {
// Clone the set with toList() here so each closed stream can be removed from the set inside close().
streams.toList().forEach { Closeables.closeQuietly(it) }
}
}
// An InputStream found in a response triggers a request to the user to provide somewhere to save it.
private object InputStreamSerializer : JsonSerializer<InputStream>() {
var invokeContext: InvocationContext<*>? = null
override fun serialize(value: InputStream, gen: JsonGenerator, serializers: SerializerProvider) {
try {
val toPath = invokeContext!!.readLine("Path to save stream to (enter to ignore): ", true)
if (toPath == null || toPath.isBlank()) {
gen.writeString("<not saved>")
} else {
val path = Paths.get(toPath)
path.write { value.copyTo(it) }
gen.writeString("<saved to: $path>")
}
} finally {
try {
value.close()
} catch(e: IOException) {
// Ignore.
}
}
}
}
//endregion
}

View File

@ -117,9 +117,15 @@ class NodeAttachmentService(override var storePath: Path, dataSourceProperties:
// TODO: PLT-147: The attachment should be randomised to prevent brute force guessing and thus privacy leaks.
override fun importAttachment(jar: InputStream): SecureHash {
require(jar !is JarInputStream)
// Read the file into RAM, hashing it to find the ID as we go. The attachment must fit into memory.
// TODO: Switch to a two-phase insert so we can handle attachments larger than RAM.
// To do this we must pipe stream into the database without knowing its hash, which we will learn only once
// the insert/upload is complete. We can then query to see if it's a duplicate and if so, erase, and if not
// set the hash field of the new attachment record.
val hs = HashingInputStream(Hashing.sha256(), jar)
val bytes = hs.readBytes()
checkIsAValidJAR(hs)
checkIsAValidJAR(ByteArrayInputStream(bytes))
val id = SecureHash.SHA256(hs.hash().asBytes())
val count = session.withTransaction {
@ -147,7 +153,7 @@ class NodeAttachmentService(override var storePath: Path, dataSourceProperties:
val extractTo = storePath / "$id.jar"
try {
extractTo.createDirectory()
extractZipFile(hs, extractTo)
extractZipFile(ByteArrayInputStream(bytes), extractTo)
} catch(e: FileAlreadyExistsException) {
log.trace("Did not extract attachment jar to directory because it already exists")
} catch(e: Exception) {
@ -164,7 +170,7 @@ class NodeAttachmentService(override var storePath: Path, dataSourceProperties:
// Note that JarInputStream won't throw any kind of error at all if the file stream is in fact not
// a ZIP! It'll just pretend it's an empty archive, which is kind of stupid but that's how it works.
// So we have to check to ensure we found at least one item.
val jar = JarInputStream(stream)
val jar = JarInputStream(stream, true)
var count = 0
while (true) {
val cursor = jar.nextJarEntry ?: break

View File

@ -6,6 +6,7 @@ import org.crsh.cli.*;
import org.crsh.command.*;
import org.crsh.text.*;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.*;
@ -37,46 +38,7 @@ public class run extends InteractiveShellCommand {
return null;
}
String cmd = String.join(" ", command).trim();
if (cmd.toLowerCase().startsWith("startflow")) {
// The flow command provides better support and startFlow requires special handling anyway due to
// the generic startFlow RPC interface which offers no type information with which to parse the
// string form of the command.
out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow);
return null;
}
Object result = null;
try {
StringToMethodCallParser.ParsedMethodCall call = parser.parse(ops(), cmd);
result = call.call();
result = processResult(result);
} catch (StringToMethodCallParser.UnparseableCallException e) {
out.println(e.getMessage(), Color.red);
out.println("Please try 'man run' to learn what syntax is acceptable", Color.red);
}
return result;
}
private Object processResult(Object result) {
if (result != null && !(result instanceof kotlin.Unit) && !(result instanceof Void)) {
result = printAndFollowRPCResponse(result, out);
}
if (result instanceof Future) {
Future future = (Future) result;
if (!future.isDone()) {
out.println("Waiting for completion or Ctrl-C ... ");
out.flush();
}
try {
result = future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
return result;
return InteractiveShell.runRPCFromString(command, out, context);
}
private void emitHelp(InvocationContext<Map> context, StringToMethodCallParser<CordaRPCOps> parser) {