From d116b5e9f420420434e20b28a57f7f4fcb9c759c Mon Sep 17 00:00:00 2001 From: Katelyn Baker Date: Mon, 5 Mar 2018 18:05:32 +0000 Subject: [PATCH 1/8] CORDA-1140 - FOR RPC Client P2P context use AMQP (#2716) * CORDA-1140 - FOR RPC Client P2P context use AMQP * Review comments * Review comments * review comments * review comments --- .../internal/KryoClientSerializationScheme.kt | 4 +-- .../corda/core/utilities/KotlinUtilsTest.kt | 17 ----------- .../internal/serialization/ServerContexts.kt | 10 ++----- .../internal/serialization/SharedContexts.kt | 9 +----- .../amqp/AMQPSerializationScheme.kt | 28 +++++++++++++++++-- 5 files changed, 30 insertions(+), 38 deletions(-) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt index 67eab59f15..6132509e21 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt @@ -6,7 +6,7 @@ import net.corda.nodeapi.internal.serialization.CordaSerializationMagic import net.corda.core.serialization.internal.SerializationEnvironment import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal.nodeSerializationEnv -import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT +import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme @@ -43,7 +43,7 @@ class KryoClientSerializationScheme : AbstractKryoSerializationScheme() { registerScheme(KryoClientSerializationScheme()) registerScheme(AMQPClientSerializationScheme(emptyList())) }, - KRYO_P2P_CONTEXT, + AMQP_P2P_CONTEXT, rpcClientContext = KRYO_RPC_CLIENT_CONTEXT) } } diff --git a/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt b/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt index 89c4ee9092..f725babdf2 100644 --- a/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt +++ b/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt @@ -6,7 +6,6 @@ import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.nodeapi.internal.serialization.KRYO_CHECKPOINT_CONTEXT -import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT import net.corda.testing.core.SerializationEnvironmentRule import org.assertj.core.api.Assertions.assertThat import org.junit.Rule @@ -39,14 +38,6 @@ class KotlinUtilsTest { assertThat(copy.transientVal).isEqualTo(copyVal) } - @Test - fun `serialise transient property with non-capturing lambda`() { - expectedEx.expect(KryoException::class.java) - expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization") - val original = NonCapturingTransientProperty() - original.serialize(context = KRYO_P2P_CONTEXT) - } - @Test fun `deserialise transient property with non-capturing lambda`() { expectedEx.expect(KryoException::class.java) @@ -66,14 +57,6 @@ class KotlinUtilsTest { assertThat(copy.transientVal).startsWith("Hello") } - @Test - fun `serialise transient property with capturing lambda`() { - expectedEx.expect(KryoException::class.java) - expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization") - val original = CapturingTransientProperty("Hello") - original.serialize(context = KRYO_P2P_CONTEXT) - } - @Test fun `deserialise transient property with capturing lambda`() { expectedEx.expect(KryoException::class.java) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt index aa939c5002..c18d7f4228 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt @@ -29,14 +29,7 @@ val KRYO_RPC_SERVER_CONTEXT = SerializationContextImpl(kryoMagic, true, SerializationContext.UseCase.RPCServer, null) -val KRYO_STORAGE_CONTEXT = SerializationContextImpl(kryoMagic, - SerializationDefaults.javaClass.classLoader, - AllButBlacklisted, - emptyMap(), - true, - SerializationContext.UseCase.Storage, - null, - AlwaysAcceptEncodingWhitelist) + val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic, SerializationDefaults.javaClass.classLoader, AllButBlacklisted, @@ -45,6 +38,7 @@ val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic, SerializationContext.UseCase.Storage, null, AlwaysAcceptEncodingWhitelist) + val AMQP_RPC_SERVER_CONTEXT = SerializationContextImpl(amqpMagic, SerializationDefaults.javaClass.classLoader, GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt index 25977e4c1e..e54c70cc69 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt @@ -16,14 +16,6 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic * CANNOT always be instantiated outside of the server and so * MUST be kept separate from these ones! */ - -val KRYO_P2P_CONTEXT = SerializationContextImpl(kryoMagic, - SerializationDefaults.javaClass.classLoader, - GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), - emptyMap(), - true, - SerializationContext.UseCase.P2P, - null) val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(kryoMagic, SerializationDefaults.javaClass.classLoader, QuasarWhitelist, @@ -32,6 +24,7 @@ val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(kryoMagic, SerializationContext.UseCase.Checkpoint, null, AlwaysAcceptEncodingWhitelist) + val AMQP_P2P_CONTEXT = SerializationContextImpl(amqpMagic, SerializationDefaults.javaClass.classLoader, GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt index 8d07afcace..f099ce7af5 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt @@ -2,13 +2,16 @@ package net.corda.nodeapi.internal.serialization.amqp +import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner import net.corda.core.cordapp.Cordapp +import net.corda.core.internal.objectOrNewInstance import net.corda.core.serialization.* import net.corda.nodeapi.internal.serialization.CordaSerializationMagic import net.corda.core.utilities.ByteSequence import net.corda.nodeapi.internal.serialization.DefaultWhitelist import net.corda.nodeapi.internal.serialization.MutableClassWhitelist import net.corda.nodeapi.internal.serialization.SerializationScheme +import java.lang.reflect.Modifier import java.security.PublicKey import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -28,10 +31,20 @@ fun SerializerFactory.addToWhitelist(vararg types: Class<*>) { abstract class AbstractAMQPSerializationScheme(val cordappLoader: List) : SerializationScheme { + // TODO: This method of initialisation for the Whitelist and plugin serializers will have to change + // when we have per-cordapp contexts and dynamic app reloading but for now it's the easiest way companion object { private val serializationWhitelists: List by lazy { ServiceLoader.load(SerializationWhitelist::class.java, this::class.java.classLoader).toList() + DefaultWhitelist } + + private val customSerializers: List> by lazy { + FastClasspathScanner().addClassLoader(this::class.java.classLoader).scan() + .getNamesOfClassesImplementing(SerializationCustomSerializer::class.java) + .mapNotNull { this::class.java.classLoader.loadClass(it).asSubclass(SerializationCustomSerializer::class.java) } + .filterNot { Modifier.isAbstract(it.modifiers) } + .map { it.kotlin.objectOrNewInstance() } + } } private fun registerCustomSerializers(factory: SerializerFactory) { @@ -69,11 +82,20 @@ abstract class AbstractAMQPSerializationScheme(val cordappLoader: List) factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray()) } - for (loader in cordappLoader) { - for (schema in loader.serializationCustomSerializers) { - factory.registerExternal(CorDappCustomSerializer(schema, factory)) + // If we're passed in an external list we trust that, otherwise revert to looking at the scan of the + // classpath to find custom serializers. + if (cordappLoader.isEmpty()) { + for (customSerializer in customSerializers) { + factory.registerExternal(CorDappCustomSerializer(customSerializer, factory)) + } + } else { + cordappLoader.forEach { loader -> + for (customSerializer in loader.serializationCustomSerializers) { + factory.registerExternal(CorDappCustomSerializer(customSerializer, factory)) + } } } + } private val serializerFactoriesForContexts = ConcurrentHashMap, SerializerFactory>() From 5efea22a9bb9b3a869222c251490df8ef2a11a04 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Mon, 5 Mar 2018 19:10:48 +0000 Subject: [PATCH 2/8] Network Bootstrap changes to master (#2736) * Added exclude whitelist to Network Bootstrapper to enable fine grained testing (#2666) * Added exclude whitelist to Network Bootstrapper to enable fine grained testing. * code review change (cherry picked from commit d4f9b10) * CORDA-1150 - better log messages (#2721) * CORDA-1150 better log messages * CORDA-1150 better log messages * CORDA-1150 better log messages (cherry picked from commit 87c5ad8) * fixed merge * remove unused function --- .../internal/network/NetworkBootstrapper.kt | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt index 2e1598d197..b554ed9c50 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt @@ -53,6 +53,7 @@ class NetworkBootstrapper { private const val LOGS_DIR_NAME = "logs" private const val WHITELIST_FILE_NAME = "whitelist.txt" + private const val EXCLUDE_WHITELIST_FILE_NAME = "exclude_whitelist.txt" @JvmStatic fun main(args: Array) { @@ -79,7 +80,7 @@ class NetworkBootstrapper { println("Gathering notary identities") val notaryInfos = gatherNotaryInfos(nodeInfoFiles) println("Notary identities to be used in network parameters: ${notaryInfos.joinToString("; ") { it.prettyPrint() }}") - val mergedWhiteList = generateWhitelist(directory / WHITELIST_FILE_NAME, cordapps?.distinct()) + val mergedWhiteList = generateWhitelist(directory / WHITELIST_FILE_NAME, directory / EXCLUDE_WHITELIST_FILE_NAME, cordapps?.distinct()) println("Updating whitelist") overwriteWhitelist(directory / WHITELIST_FILE_NAME, mergedWhiteList) installNetworkParameters(notaryInfos, nodeDirs, mergedWhiteList) @@ -187,21 +188,24 @@ class NetworkBootstrapper { nodeDirs.forEach { copier.install(it) } } - private fun generateWhitelist(whitelistFile: Path, cordapps: List?): Map> { + private fun generateWhitelist(whitelistFile: Path, excludeWhitelistFile: Path, cordapps: List?): Map> { val existingWhitelist = if (whitelistFile.exists()) readContractWhitelist(whitelistFile) else emptyMap() - println("Found existing whitelist:") - existingWhitelist.forEach { println(it.outputString()) } + println(if (existingWhitelist.isEmpty()) "No existing whitelist file found." else "Found existing whitelist: ${whitelistFile}") - val newWhiteList: Map = cordapps?.flatMap { cordappJarPath -> + val excludeContracts = if (excludeWhitelistFile.exists()) readExcludeWhitelist(excludeWhitelistFile) else emptyList() + if (excludeContracts.isNotEmpty()) { + println("Exclude contracts from whitelist: ${excludeContracts.joinToString()}}") + } + + val newWhiteList = cordapps?.flatMap { cordappJarPath -> val jarHash = getJarHash(cordappJarPath) scanJarForContracts(cordappJarPath).map { contract -> contract to jarHash } - }?.toMap() ?: emptyMap() + }?.filter { (contractClassName, _) -> contractClassName !in excludeContracts }?.toMap() ?: emptyMap() - println("Calculating whitelist for current CorDapps:") - newWhiteList.forEach { (contract, attachment) -> println("$contract:$attachment") } + println("Calculating whitelist for current installed CorDapps..") val merged = (newWhiteList.keys + existingWhitelist.keys).map { contractClassName -> val existing = existingWhitelist[contractClassName] ?: emptyList() @@ -209,15 +213,15 @@ class NetworkBootstrapper { contractClassName to (if (newHash == null || newHash in existing) existing else existing + newHash) }.toMap() - println("Final whitelist:") - merged.forEach { println(it.outputString()) } - + println("CorDapp whitelist " + (if (existingWhitelist.isEmpty()) "generated" else "updated") + " in ${whitelistFile}") return merged } private fun overwriteWhitelist(whitelistFile: Path, mergedWhiteList: Map>) { PrintStream(whitelistFile.toFile().outputStream()).use { out -> - mergedWhiteList.forEach { out.println(it.outputString()) } + mergedWhiteList.forEach { (contract, attachments) -> + out.println("${contract}:${attachments.joinToString(",")}") + } } } @@ -227,12 +231,14 @@ class NetworkBootstrapper { SecureHash.SHA256(hs.hash().asBytes()) } - private fun readContractWhitelist(file: Path): Map> = file.toFile().readLines() + private fun readContractWhitelist(file: Path): Map> = file.readAllLines() .map { line -> line.split(":") } .map { (contract, attachmentIds) -> contract to (attachmentIds.split(",").map(::parse)) }.toMap() + private fun readExcludeWhitelist(file: Path): List = file.readAllLines().map(String::trim) + private fun NotaryInfo.prettyPrint(): String = "${identity.name} (${if (validating) "" else "non-"}validating)" private fun NodeInfo.notaryIdentity(): Party { @@ -246,8 +252,6 @@ class NetworkBootstrapper { } } - private fun Map.Entry>.outputString() = "$key:${value.joinToString(",")}" - // We need to to set serialization env, because generation of parameters is run from Cordform. // KryoServerSerializationScheme is not accessible from nodeapi. private fun initialiseSerialization() { From 3a247f29667c02bd8568faaa249a2206584f2f01 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Mon, 5 Mar 2018 19:11:04 +0000 Subject: [PATCH 3/8] CORDA-1147 - fix shell (#2730) (#2737) --- .../src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt index 28dfa8d086..bb58a0d1e3 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt @@ -182,6 +182,8 @@ class ProgressTracker(vararg steps: Step) { fun endWithError(error: Throwable) { check(!hasEnded) { "Progress tracker has already ended" } _changes.onError(error) + _stepsTreeIndexChanges.onError(error) + _stepsTreeChanges.onError(error) } /** The parent of this tracker: set automatically by the parent when a tracker is added as a child */ From 6479d7d8ff0511d44de3f48dac361421a4f93d90 Mon Sep 17 00:00:00 2001 From: Michele Sollecito Date: Tue, 6 Mar 2018 10:15:17 +0000 Subject: [PATCH 4/8] [CORDA-1156]: Output of run networkMapFeed and run networkMapSnapshot not consistent in shell (fixed) (#2733) --- build.gradle | 1 + node/build.gradle | 1 + .../net/corda/node/shell/InteractiveShell.kt | 81 ++++++++++++++----- 3 files changed, 63 insertions(+), 20 deletions(-) diff --git a/build.gradle b/build.gradle index e146adba48..473e047253 100644 --- a/build.gradle +++ b/build.gradle @@ -40,6 +40,7 @@ buildscript { ext.jackson_version = '2.9.3' ext.jetty_version = '9.4.7.v20170914' ext.jersey_version = '2.25' + ext.json_version = '20180130' ext.assertj_version = '3.8.0' ext.slf4j_version = '1.7.25' ext.log4j_version = '2.9.1' diff --git a/node/build.gradle b/node/build.gradle index 13dedeea6a..be7f3ee027 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -104,6 +104,7 @@ dependencies { // Jackson support: serialisation to/from JSON, YAML, etc compile project(':client:jackson') + compile group: 'org.json', name: 'json', version: json_version // Coda Hale's Metrics: for monitoring of key statistics compile "io.dropwizard.metrics:metrics-core:3.1.2" diff --git a/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt b/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt index 57fb06dee1..8cb160b908 100644 --- a/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt +++ b/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt @@ -1,6 +1,5 @@ package net.corda.node.shell -import com.fasterxml.jackson.core.JsonFactory import com.fasterxml.jackson.core.JsonGenerator import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.databind.* @@ -15,6 +14,7 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.UniqueIdentifier import net.corda.core.flows.FlowLogic import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party import net.corda.core.internal.* import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.openFuture @@ -22,7 +22,9 @@ import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.DataFeed import net.corda.core.messaging.FlowProgressHandle import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.node.NodeInfo import net.corda.core.node.services.IdentityService +import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.Node import net.corda.node.internal.StartedNode import net.corda.node.internal.security.AdminSubject @@ -52,6 +54,7 @@ import org.crsh.util.Utils import org.crsh.vfs.FS import org.crsh.vfs.spi.file.FileMountFactory import org.crsh.vfs.spi.url.ClassPathMountFactory +import org.json.JSONObject import org.slf4j.LoggerFactory import rx.Observable import rx.Subscriber @@ -196,13 +199,35 @@ object InteractiveShell { } } - private fun createOutputMapper(factory: JsonFactory): ObjectMapper { - return JacksonSupport.createNonRpcMapper(factory).apply { + private object NodeInfoSerializer : JsonSerializer() { + + override fun serialize(nodeInfo: NodeInfo, gen: JsonGenerator, serializers: SerializerProvider) { + + val json = JSONObject() + json["addresses"] = nodeInfo.addresses.map { address -> address.serialise() } + json["legalIdentities"] = nodeInfo.legalIdentities.map { address -> address.serialise() } + json["platformVersion"] = nodeInfo.platformVersion + json["serial"] = nodeInfo.serial + gen.writeRaw(json.toString()) + } + + private fun NetworkHostAndPort.serialise() = this.toString() + private fun Party.serialise() = JSONObject().put("name", this.name) + + private operator fun JSONObject.set(key: String, value: Any?): JSONObject { + return put(key, value) + } + } + + private fun createOutputMapper(): ObjectMapper { + + return JacksonSupport.createNonRpcMapper().apply { // Register serializers for stateful objects from libraries that are special to the RPC system and don't // make sense to print out to the screen. For classes we own, annotations can be used instead. val rpcModule = SimpleModule() rpcModule.addSerializer(Observable::class.java, ObservableSerializer) rpcModule.addSerializer(InputStream::class.java, InputStreamSerializer) + rpcModule.addSerializer(NodeInfo::class.java, NodeInfoSerializer) registerModule(rpcModule) disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) @@ -211,7 +236,7 @@ object InteractiveShell { } // TODO: This should become the default renderer rather than something used specifically by commands. - private val yamlMapper by lazy { createOutputMapper(YAMLFactory()) } + private val outputMapper by lazy { createOutputMapper() } /** * Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out @@ -394,11 +419,19 @@ object InteractiveShell { return result } - private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture { - val printerFun = yamlMapper::writeValueAsString - toStream.println(printerFun(response)) - toStream.flush() - return maybeFollow(response, printerFun, toStream) + private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter): CordaFuture { + + val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } + val mappingFunction: (Any?) -> String = { value -> + if (value is Collection<*>) { + value.joinToString(",${System.lineSeparator()} ", "[${System.lineSeparator()} ", "${System.lineSeparator()}]") { element -> + mapElement(element) + } + } else { + mapElement(value) + } + } + return maybeFollow(response, mappingFunction, out) } private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber() { @@ -421,6 +454,7 @@ object InteractiveShell { override fun onNext(t: Any?) { count++ toStream.println("Observation $count: " + printerFun(t)) + toStream.flush() } @Synchronized @@ -431,25 +465,32 @@ object InteractiveShell { } } - private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): CordaFuture { + private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture { // Match on a couple of common patterns for "important" observables. It's tough to do this in a generic // way because observables can be embedded anywhere in the object graph, and can emit other arbitrary // object graphs that contain yet more observables. So we just look for top level responses that follow // the standard "track" pattern, and print them until the user presses Ctrl-C if (response == null) return doneFuture(Unit) - val observable: Observable<*> = when (response) { - is Observable<*> -> response - is DataFeed<*, *> -> { - toStream.println("Snapshot") - toStream.println(response.snapshot) - response.updates - } - else -> return doneFuture(Unit) + if (response is DataFeed<*, *>) { + out.println("Snapshot:") + out.println(printerFun(response.snapshot)) + out.flush() + out.println("Updates:") + return printNextElements(response.updates, printerFun, out) + } + if (response is Observable<*>) { + return printNextElements(response, printerFun, out) } - val subscriber = PrintingSubscriber(printerFun, toStream) - uncheckedCast(observable).subscribe(subscriber) + out.println(printerFun(response)) + return doneFuture(Unit) + } + + private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture { + + val subscriber = PrintingSubscriber(printerFun, out) + uncheckedCast(elements).subscribe(subscriber) return subscriber.future } From f1856f0146072558509dc734d926b3acdb11013e Mon Sep 17 00:00:00 2001 From: Konstantinos Chalkias Date: Tue, 6 Mar 2018 10:28:10 +0000 Subject: [PATCH 5/8] CORDA-1145 - troubleshooting updates (#2738) --- docs/source/troubleshooting.rst | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/docs/source/troubleshooting.rst b/docs/source/troubleshooting.rst index 4176f04ed3..8ec877757d 100644 --- a/docs/source/troubleshooting.rst +++ b/docs/source/troubleshooting.rst @@ -77,6 +77,25 @@ If IDEA refuses to open a project because an SDK has not been selected, you may If you are having trouble selecting the correct JDK, the JetBrains website provides the `following guidelines `_. +IDEA complains about JVM target +******************************* + +If you receive a ``Cannot inline bytecode build with JVM target 1.8 into bytecode...``, please ensure JDK is not +outdated and check that + +.. parsed-literal:: + + Settings/Build, Execution, Deployment/Compiler/Kotlin Compiler/Target JVM Version=1.8 + +.. + +IDEA red-lining - Unresolved reference: function +************************************************ + +If you are running under an outdated SDK, IntelliJ will not complain about lack of an SDK, but you might notice +some functions are red-lined and compilation fails. In this case you should update you SDK, see JetBrains website +`following guidelines `_. + IDEA fails to compile Corda because it refuses to find some dependencies ************************************************************************ @@ -100,7 +119,7 @@ or checking the .. parsed-literal:: - Settings/Build,Execution,Deployment/Build Tools/Gradle/Runner/Delegate IDE build-run actions to gradle + Settings/Build, Execution, Deployment/Build Tools/Gradle/Runner/Delegate IDE build-run actions to gradle .. From 81f4bbcaf3bc1ca36738aceab11bc2286e6fd0da Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Tue, 6 Mar 2018 10:49:29 +0000 Subject: [PATCH 6/8] Fix flaky bridge test and an associated deadlock during rollback in the BridgeManager code. (#2739) --- .../internal/bridging/AMQPBridgeManager.kt | 56 ++++++++--------- .../net/corda/node/amqp/AMQPBridgeTest.kt | 62 +++++++++++++------ 2 files changed, 69 insertions(+), 49 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 94ddb681ac..13b73bf3b5 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -126,37 +126,35 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa } private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { - lock.withLock { - val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) } - val properties = HashMap() - for (key in artemisMessage.propertyNames) { - var value = artemisMessage.getObjectProperty(key) - if (value is SimpleString) { - value = value.toString() - } - properties[key.toString()] = value + val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) } + val properties = HashMap() + for (key in artemisMessage.propertyNames) { + var value = artemisMessage.getObjectProperty(key) + if (value is SimpleString) { + value = value.toString() } - log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } - val peerInbox = translateLocalQueueToInboxAddress(queueName) - val sendableMessage = amqpClient.createMessage(data, peerInbox, - legalNames.first().toString(), - properties) - sendableMessage.onComplete.then { - log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" } - lock.withLock { - if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { - artemisMessage.acknowledge() - } else { - log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") - // We need to commit any acknowledged messages before rolling back the failed - // (unacknowledged) message. - session?.commit() - session?.rollback(false) - } - } - } - amqpClient.write(sendableMessage) + properties[key.toString()] = value } + log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } + val peerInbox = translateLocalQueueToInboxAddress(queueName) + val sendableMessage = amqpClient.createMessage(data, peerInbox, + legalNames.first().toString(), + properties) + sendableMessage.onComplete.then { + log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" } + lock.withLock { + if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { + artemisMessage.acknowledge() + } else { + log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") + // We need to commit any acknowledged messages before rolling back the failed + // (unacknowledged) message. + session?.commit() + session?.rollback(false) + } + } + } + amqpClient.write(sendableMessage) } } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 7238069b51..38804f56d2 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -5,6 +5,7 @@ import com.nhaarman.mockito_kotlin.whenever import net.corda.core.crypto.toStringShort import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.loggerFor import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate @@ -25,13 +26,14 @@ import org.junit.Test import org.junit.rules.TemporaryFolder import java.util.* import kotlin.test.assertEquals -import kotlin.test.assertNotEquals class AMQPBridgeTest { @Rule @JvmField val temporaryFolder = TemporaryFolder() + private val log = loggerFor() + private val ALICE = TestIdentity(ALICE_NAME) private val BOB = TestIdentity(BOB_NAME) @@ -64,14 +66,16 @@ class AMQPBridgeTest { //Create target server val amqpServer = createAMQPServer() + val dedupeSet = mutableSetOf() val receive = amqpServer.onReceive.toBlocking().iterator amqpServer.start() val receivedSequence = mutableListOf() + val atNodeSequence = mutableListOf() fun formatMessage(expected: String, actual: Int, received: List): String { - return "Expected message with id $expected, got $actual, previous message receive sequence: " + return "Expected message with id $expected, got $actual, previous message receive sequence: " + "${received.joinToString(", ", "[", "]")}." } @@ -79,66 +83,84 @@ class AMQPBridgeTest { val messageID1 = received1.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID1".toByteArray(), received1.payload) assertEquals(0, messageID1) + dedupeSet += received1.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String received1.complete(true) // Accept first message - receivedSequence.add(messageID1) + receivedSequence += messageID1 + atNodeSequence += messageID1 val received2 = receive.next() val messageID2 = received2.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID2".toByteArray(), received2.payload) assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence)) - received2.complete(false) // Reject message - receivedSequence.add(messageID2) + received2.complete(false) // Reject message and don't add to dedupe + receivedSequence += messageID2 // reflects actual sequence + // drop things until we get back to the replay while (true) { val received3 = receive.next() val messageID3 = received3.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID3".toByteArray(), received3.payload) - assertNotEquals(0, messageID3, formatMessage("< 1", messageID3, receivedSequence)) - receivedSequence.add(messageID3) + receivedSequence += messageID3 if (messageID3 != 1) { // keep rejecting any batched items following rejection received3.complete(false) } else { // beginnings of replay so accept again received3.complete(true) + val messageId = received3.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String + if (messageId !in dedupeSet) { + dedupeSet += messageId + atNodeSequence += messageID3 + } break } } + // start receiving again, but discarding duplicates while (true) { val received4 = receive.next() val messageID4 = received4.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID4".toByteArray(), received4.payload) - receivedSequence.add(messageID4) - if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip - assertEquals(2, messageID4) // next message should be in order though - break + receivedSequence += messageID4 + val messageId = received4.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String + if (messageId !in dedupeSet) { + dedupeSet += messageId + atNodeSequence += messageID4 } received4.complete(true) + if (messageID4 == 2) { // started to replay messages after rejection point + break + } } // Send a fresh item and check receive val artemisMessage = artemis.session.createMessage(true).apply { - putIntProperty("CountProp", -1) - writeBodyBufferBytes("Test_end".toByteArray()) + putIntProperty("CountProp", 3) + writeBodyBufferBytes("Test3".toByteArray()) // Use the magic deduplication property built into Artemis as our message identity too putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) } artemis.producer.send(sourceQueueName, artemisMessage) + // start receiving again, discarding duplicates while (true) { val received5 = receive.next() val messageID5 = received5.applicationProperties["CountProp"] as Int - if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip - assertEquals(-1, messageID5, formatMessage("-1", messageID5, receivedSequence)) // next message should be in order though - assertArrayEquals("Test_end".toByteArray(), received5.payload) - receivedSequence.add(messageID5) + assertArrayEquals("Test$messageID5".toByteArray(), received5.payload) + receivedSequence += messageID5 + val messageId = received5.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String + if (messageId !in dedupeSet) { + dedupeSet += messageId + atNodeSequence += messageID5 + } + received5.complete(true) + if (messageID5 == 3) { // reached our fresh message break } - receivedSequence.add(messageID5) - received5.complete(true) } - println("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}") + log.info("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}") + log.info("Deduped sequence: ${atNodeSequence.joinToString(", ", "[", "]")}") + assertEquals(listOf(0, 1, 2, 3), atNodeSequence) bridgeManager.stop() amqpServer.stop() artemisClient.stop() From 65bfc833d346357ccc0ca8d5b5fb5190b2ab598f Mon Sep 17 00:00:00 2001 From: Konstantinos Chalkias Date: Tue, 6 Mar 2018 11:12:25 +0000 Subject: [PATCH 7/8] CORDA-1173 - SwapIdentityFlow doc fixes (#2740) --- .../net/corda/confidential/IdentitySyncFlow.kt | 16 ++++++++-------- .../net/corda/confidential/SwapIdentitiesFlow.kt | 16 ++++++++-------- docs/source/api-identity.rst | 13 +++++-------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/confidential-identities/src/main/kotlin/net/corda/confidential/IdentitySyncFlow.kt b/confidential-identities/src/main/kotlin/net/corda/confidential/IdentitySyncFlow.kt index bffff0abf1..d9aac047da 100644 --- a/confidential-identities/src/main/kotlin/net/corda/confidential/IdentitySyncFlow.kt +++ b/confidential-identities/src/main/kotlin/net/corda/confidential/IdentitySyncFlow.kt @@ -12,14 +12,14 @@ import net.corda.core.utilities.unwrap object IdentitySyncFlow { /** - * Flow for ensuring that our counterparties in a transaction have the full certificate paths for *our* confidential - * identities used in states present in the transaction. This is intended for use as a subflow of another flow, typically between - * transaction assembly and signing. An example of where this is useful is where a recipient of a [Cash] state wants - * to know that it is being paid by the correct party, and the owner of the state is a confidential identity of that - * party. This flow would send a copy of the confidential identity path to the recipient, enabling them to verify that - * identity. + * Flow for ensuring that our counter-parties in a transaction have the full certificate paths for *our* + * confidential identities used in states present in the transaction. This is intended for use as a sub-flow of + * another flow, typically between transaction assembly and signing. An example of where this is useful is where + * a recipient of a state wants to know that it is being paid by the correct party, and the owner of the state is a + * confidential identity of that party. This flow would send a copy of the confidential identity path to the + * recipient, enabling them to verify that identity. */ - // TODO: Can this be triggered automatically from [SendTransactionFlow] + // TODO: Can this be triggered automatically from [SendTransactionFlow]? class Send(val otherSideSessions: Set, val tx: WireTransaction, override val progressTracker: ProgressTracker) : FlowLogic() { @@ -81,7 +81,7 @@ object IdentitySyncFlow { override val progressTracker: ProgressTracker = ProgressTracker(RECEIVING_IDENTITIES, RECEIVING_CERTIFICATES) @Suspendable - override fun call(): Unit { + override fun call() { progressTracker.currentStep = RECEIVING_IDENTITIES val allIdentities = otherSideSession.receive>().unwrap { it } val unknownIdentities = allIdentities.filter { serviceHub.identityService.wellKnownPartyFromAnonymous(it) == null } diff --git a/confidential-identities/src/main/kotlin/net/corda/confidential/SwapIdentitiesFlow.kt b/confidential-identities/src/main/kotlin/net/corda/confidential/SwapIdentitiesFlow.kt index f98970c322..ac2ffe15df 100644 --- a/confidential-identities/src/main/kotlin/net/corda/confidential/SwapIdentitiesFlow.kt +++ b/confidential-identities/src/main/kotlin/net/corda/confidential/SwapIdentitiesFlow.kt @@ -23,7 +23,7 @@ import java.util.* /** * Very basic flow which generates new confidential identities for parties in a transaction and exchanges the transaction - * key and certificate paths between the parties. This is intended for use as a subflow of another flow which builds a + * key and certificate paths between the parties. This is intended for use as a sub-flow of another flow which builds a * transaction. */ @StartableByRPC @@ -38,7 +38,7 @@ class SwapIdentitiesFlow(private val otherParty: Party, fun tracker() = ProgressTracker(AWAITING_KEY) /** - * Generate the determinstic data blob the confidential identity's key holder signs to indicate they want to + * Generate the deterministic data blob the confidential identity's key holder signs to indicate they want to * represent the subject named in the X.509 certificate. Note that this is never actually sent between nodes, * but only the signature is sent. The blob is built independently on each node and the received signature * verified against the expected blob, rather than exchanging the blob. @@ -64,7 +64,7 @@ class SwapIdentitiesFlow(private val otherParty: Party, throw SwapIdentitiesException("Signature does not match the expected identity ownership assertion.", ex) } // Validate then store their identity so that we can prove the key in the transaction is owned by the - // counterparty. + // counter-party. identityService.verifyAndRegisterIdentity(anonymousOtherSide) return anonymousOtherSide } @@ -90,8 +90,8 @@ class SwapIdentitiesFlow(private val otherParty: Party, val confidentialIdentity: PartyAndCertificate = confidentialIdentityBytes.bytes.deserialize() validateAndRegisterIdentity(serviceHub.identityService, otherParty, confidentialIdentity, theirSigBytes) } - identities.put(ourIdentity, legalIdentityAnonymous.party.anonymise()) - identities.put(otherParty, anonymousOtherSide.party.anonymise()) + identities[ourIdentity] = legalIdentityAnonymous.party.anonymise() + identities[otherParty] = anonymousOtherSide.party.anonymise() } return identities } @@ -101,9 +101,9 @@ class SwapIdentitiesFlow(private val otherParty: Party, } /** - * Data class used only in the context of asserting the owner of the private key for the listed key wants to use it - * to represent the named entity. This is pairs with an X.509 certificate (which asserts the signing identity says - * the key represents the named entity), but protects against a certificate authority incorrectly claiming others' + * Data class used only in the context of asserting that the owner of the private key for the listed key wants to use it + * to represent the named entity. This is paired with an X.509 certificate (which asserts the signing identity says + * the key represents the named entity) and protects against a malicious party incorrectly claiming others' * keys. */ @CordaSerializable diff --git a/docs/source/api-identity.rst b/docs/source/api-identity.rst index 77f6896c99..0ace5dd36d 100644 --- a/docs/source/api-identity.rst +++ b/docs/source/api-identity.rst @@ -74,19 +74,16 @@ You can see an example of using ``SwapIdentitiesFlow`` in ``TwoPartyDealFlow.kt` ``SwapIdentitiesFlow`` goes through the following key steps: -1. Generate a nonce value to form a challenge to the other nodes -2. Send nonce value to all counterparties, and receive their nonce values -3. Generate a new confidential identity from our well-known identity -4. Create a data blob containing the new confidential identity (public key, name and X.509 certificate path), - and the hash of the nonce values -5. Sign the resulting data blob with the confidential identity's private key -6. Send the confidential identity and data blob signature to all counterparties, while receiving theirs +1. Generate a new confidential identity from our well-known identity +2. Create a ``CertificateOwnershipAssertion`` object containing the new confidential identity (X500 name, public key) +5. Sign this object with the confidential identity's private key +6. Send the confidential identity and aforementioned signature to counter-parties, while receiving theirs 7. Verify the signatures to ensure that identities were generated by the involved set of parties 8. Verify the confidential identities are owned by the expected well known identities 9. Store the confidential identities and return them to the calling flow This ensures not only that the confidential identity X.509 certificates are signed by the correct well-known -identities, but also that the confidential identity private key is held by the counterparty, and that a party cannot +identities, but also that the confidential identity private key is held by the counter-party, and that a party cannot claim ownership of another party's confidential identities. IdentitySyncFlow From 549d2710479d11f96926c9c705767d5b54909b06 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Tue, 6 Mar 2018 14:04:23 +0000 Subject: [PATCH 8/8] fix merge --- .../r3/enclaves/txverify/EnclaveletSerializationScheme.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/verify-enclave/src/main/kotlin/com/r3/enclaves/txverify/EnclaveletSerializationScheme.kt b/verify-enclave/src/main/kotlin/com/r3/enclaves/txverify/EnclaveletSerializationScheme.kt index 38c75114b3..99b546ba8d 100644 --- a/verify-enclave/src/main/kotlin/com/r3/enclaves/txverify/EnclaveletSerializationScheme.kt +++ b/verify-enclave/src/main/kotlin/com/r3/enclaves/txverify/EnclaveletSerializationScheme.kt @@ -6,7 +6,7 @@ import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.utilities.toHexString import net.corda.nodeapi.internal.serialization.CordaSerializationMagic -import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT +import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory @@ -27,10 +27,10 @@ private class EnclaveletSerializationScheme { registerScheme(AMQPVerifierSerializationScheme) }, /** - * Even though default context is set to Kryo P2P, the encoding will be adjusted depending on the + * Even though default context is set to Amqp P2P, the encoding will be adjusted depending on the * incoming request received. */ - KRYO_P2P_CONTEXT) + AMQP_P2P_CONTEXT) /* * Ensure that we initialise JAXP before blacklisting is enabled.