diff --git a/build.gradle b/build.gradle index bb17b85d42..ccfbe64219 100644 --- a/build.gradle +++ b/build.gradle @@ -42,6 +42,7 @@ buildscript { ext.jackson_version = '2.9.3' ext.jetty_version = '9.4.7.v20170914' ext.jersey_version = '2.25' + ext.json_version = '20180130' ext.assertj_version = '3.8.0' ext.slf4j_version = '1.7.25' ext.log4j_version = '2.9.1' diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt index 67eab59f15..6132509e21 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt @@ -6,7 +6,7 @@ import net.corda.nodeapi.internal.serialization.CordaSerializationMagic import net.corda.core.serialization.internal.SerializationEnvironment import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal.nodeSerializationEnv -import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT +import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme @@ -43,7 +43,7 @@ class KryoClientSerializationScheme : AbstractKryoSerializationScheme() { registerScheme(KryoClientSerializationScheme()) registerScheme(AMQPClientSerializationScheme(emptyList())) }, - KRYO_P2P_CONTEXT, + AMQP_P2P_CONTEXT, rpcClientContext = KRYO_RPC_CLIENT_CONTEXT) } } diff --git a/confidential-identities/src/main/kotlin/net/corda/confidential/IdentitySyncFlow.kt b/confidential-identities/src/main/kotlin/net/corda/confidential/IdentitySyncFlow.kt index bffff0abf1..d9aac047da 100644 --- a/confidential-identities/src/main/kotlin/net/corda/confidential/IdentitySyncFlow.kt +++ b/confidential-identities/src/main/kotlin/net/corda/confidential/IdentitySyncFlow.kt @@ -12,14 +12,14 @@ import net.corda.core.utilities.unwrap object IdentitySyncFlow { /** - * Flow for ensuring that our counterparties in a transaction have the full certificate paths for *our* confidential - * identities used in states present in the transaction. This is intended for use as a subflow of another flow, typically between - * transaction assembly and signing. An example of where this is useful is where a recipient of a [Cash] state wants - * to know that it is being paid by the correct party, and the owner of the state is a confidential identity of that - * party. This flow would send a copy of the confidential identity path to the recipient, enabling them to verify that - * identity. + * Flow for ensuring that our counter-parties in a transaction have the full certificate paths for *our* + * confidential identities used in states present in the transaction. This is intended for use as a sub-flow of + * another flow, typically between transaction assembly and signing. An example of where this is useful is where + * a recipient of a state wants to know that it is being paid by the correct party, and the owner of the state is a + * confidential identity of that party. This flow would send a copy of the confidential identity path to the + * recipient, enabling them to verify that identity. */ - // TODO: Can this be triggered automatically from [SendTransactionFlow] + // TODO: Can this be triggered automatically from [SendTransactionFlow]? class Send(val otherSideSessions: Set, val tx: WireTransaction, override val progressTracker: ProgressTracker) : FlowLogic() { @@ -81,7 +81,7 @@ object IdentitySyncFlow { override val progressTracker: ProgressTracker = ProgressTracker(RECEIVING_IDENTITIES, RECEIVING_CERTIFICATES) @Suspendable - override fun call(): Unit { + override fun call() { progressTracker.currentStep = RECEIVING_IDENTITIES val allIdentities = otherSideSession.receive>().unwrap { it } val unknownIdentities = allIdentities.filter { serviceHub.identityService.wellKnownPartyFromAnonymous(it) == null } diff --git a/confidential-identities/src/main/kotlin/net/corda/confidential/SwapIdentitiesFlow.kt b/confidential-identities/src/main/kotlin/net/corda/confidential/SwapIdentitiesFlow.kt index f98970c322..ac2ffe15df 100644 --- a/confidential-identities/src/main/kotlin/net/corda/confidential/SwapIdentitiesFlow.kt +++ b/confidential-identities/src/main/kotlin/net/corda/confidential/SwapIdentitiesFlow.kt @@ -23,7 +23,7 @@ import java.util.* /** * Very basic flow which generates new confidential identities for parties in a transaction and exchanges the transaction - * key and certificate paths between the parties. This is intended for use as a subflow of another flow which builds a + * key and certificate paths between the parties. This is intended for use as a sub-flow of another flow which builds a * transaction. */ @StartableByRPC @@ -38,7 +38,7 @@ class SwapIdentitiesFlow(private val otherParty: Party, fun tracker() = ProgressTracker(AWAITING_KEY) /** - * Generate the determinstic data blob the confidential identity's key holder signs to indicate they want to + * Generate the deterministic data blob the confidential identity's key holder signs to indicate they want to * represent the subject named in the X.509 certificate. Note that this is never actually sent between nodes, * but only the signature is sent. The blob is built independently on each node and the received signature * verified against the expected blob, rather than exchanging the blob. @@ -64,7 +64,7 @@ class SwapIdentitiesFlow(private val otherParty: Party, throw SwapIdentitiesException("Signature does not match the expected identity ownership assertion.", ex) } // Validate then store their identity so that we can prove the key in the transaction is owned by the - // counterparty. + // counter-party. identityService.verifyAndRegisterIdentity(anonymousOtherSide) return anonymousOtherSide } @@ -90,8 +90,8 @@ class SwapIdentitiesFlow(private val otherParty: Party, val confidentialIdentity: PartyAndCertificate = confidentialIdentityBytes.bytes.deserialize() validateAndRegisterIdentity(serviceHub.identityService, otherParty, confidentialIdentity, theirSigBytes) } - identities.put(ourIdentity, legalIdentityAnonymous.party.anonymise()) - identities.put(otherParty, anonymousOtherSide.party.anonymise()) + identities[ourIdentity] = legalIdentityAnonymous.party.anonymise() + identities[otherParty] = anonymousOtherSide.party.anonymise() } return identities } @@ -101,9 +101,9 @@ class SwapIdentitiesFlow(private val otherParty: Party, } /** - * Data class used only in the context of asserting the owner of the private key for the listed key wants to use it - * to represent the named entity. This is pairs with an X.509 certificate (which asserts the signing identity says - * the key represents the named entity), but protects against a certificate authority incorrectly claiming others' + * Data class used only in the context of asserting that the owner of the private key for the listed key wants to use it + * to represent the named entity. This is paired with an X.509 certificate (which asserts the signing identity says + * the key represents the named entity) and protects against a malicious party incorrectly claiming others' * keys. */ @CordaSerializable diff --git a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt index 28dfa8d086..bb58a0d1e3 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt @@ -182,6 +182,8 @@ class ProgressTracker(vararg steps: Step) { fun endWithError(error: Throwable) { check(!hasEnded) { "Progress tracker has already ended" } _changes.onError(error) + _stepsTreeIndexChanges.onError(error) + _stepsTreeChanges.onError(error) } /** The parent of this tracker: set automatically by the parent when a tracker is added as a child */ diff --git a/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt b/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt index e7fbf6a5d0..9c75ca2da0 100644 --- a/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt +++ b/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt @@ -6,7 +6,6 @@ import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.nodeapi.internal.serialization.KRYO_CHECKPOINT_CONTEXT -import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT import net.corda.testing.core.SerializationEnvironmentRule import org.assertj.core.api.Assertions.assertThat import org.junit.Rule @@ -39,14 +38,6 @@ class KotlinUtilsTest { assertThat(copy.transientVal).isEqualTo(copyVal) } - @Test - fun `serialise transient property with non-capturing lambda`() { - expectedEx.expect(KryoException::class.java) - expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization") - val original = NonCapturingTransientProperty() - original.serialize(context = KRYO_P2P_CONTEXT) - } - @Test fun `deserialise transient property with non-capturing lambda`() { expectedEx.expect(KryoException::class.java) @@ -66,14 +57,6 @@ class KotlinUtilsTest { assertThat(copy.transientVal).startsWith("Hello") } - @Test - fun `serialise transient property with capturing lambda`() { - expectedEx.expect(KryoException::class.java) - expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization") - val original = CapturingTransientProperty("Hello") - original.serialize(context = KRYO_P2P_CONTEXT) - } - @Test fun `deserialise transient property with capturing lambda`() { expectedEx.expect(KryoException::class.java) diff --git a/docs/source/api-identity.rst b/docs/source/api-identity.rst index 77f6896c99..0ace5dd36d 100644 --- a/docs/source/api-identity.rst +++ b/docs/source/api-identity.rst @@ -74,19 +74,16 @@ You can see an example of using ``SwapIdentitiesFlow`` in ``TwoPartyDealFlow.kt` ``SwapIdentitiesFlow`` goes through the following key steps: -1. Generate a nonce value to form a challenge to the other nodes -2. Send nonce value to all counterparties, and receive their nonce values -3. Generate a new confidential identity from our well-known identity -4. Create a data blob containing the new confidential identity (public key, name and X.509 certificate path), - and the hash of the nonce values -5. Sign the resulting data blob with the confidential identity's private key -6. Send the confidential identity and data blob signature to all counterparties, while receiving theirs +1. Generate a new confidential identity from our well-known identity +2. Create a ``CertificateOwnershipAssertion`` object containing the new confidential identity (X500 name, public key) +5. Sign this object with the confidential identity's private key +6. Send the confidential identity and aforementioned signature to counter-parties, while receiving theirs 7. Verify the signatures to ensure that identities were generated by the involved set of parties 8. Verify the confidential identities are owned by the expected well known identities 9. Store the confidential identities and return them to the calling flow This ensures not only that the confidential identity X.509 certificates are signed by the correct well-known -identities, but also that the confidential identity private key is held by the counterparty, and that a party cannot +identities, but also that the confidential identity private key is held by the counter-party, and that a party cannot claim ownership of another party's confidential identities. IdentitySyncFlow diff --git a/docs/source/troubleshooting.rst b/docs/source/troubleshooting.rst index 4176f04ed3..8ec877757d 100644 --- a/docs/source/troubleshooting.rst +++ b/docs/source/troubleshooting.rst @@ -77,6 +77,25 @@ If IDEA refuses to open a project because an SDK has not been selected, you may If you are having trouble selecting the correct JDK, the JetBrains website provides the `following guidelines `_. +IDEA complains about JVM target +******************************* + +If you receive a ``Cannot inline bytecode build with JVM target 1.8 into bytecode...``, please ensure JDK is not +outdated and check that + +.. parsed-literal:: + + Settings/Build, Execution, Deployment/Compiler/Kotlin Compiler/Target JVM Version=1.8 + +.. + +IDEA red-lining - Unresolved reference: function +************************************************ + +If you are running under an outdated SDK, IntelliJ will not complain about lack of an SDK, but you might notice +some functions are red-lined and compilation fails. In this case you should update you SDK, see JetBrains website +`following guidelines `_. + IDEA fails to compile Corda because it refuses to find some dependencies ************************************************************************ @@ -100,7 +119,7 @@ or checking the .. parsed-literal:: - Settings/Build,Execution,Deployment/Build Tools/Gradle/Runner/Delegate IDE build-run actions to gradle + Settings/Build, Execution, Deployment/Build Tools/Gradle/Runner/Delegate IDE build-run actions to gradle .. diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 94ddb681ac..13b73bf3b5 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -126,37 +126,35 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa } private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { - lock.withLock { - val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) } - val properties = HashMap() - for (key in artemisMessage.propertyNames) { - var value = artemisMessage.getObjectProperty(key) - if (value is SimpleString) { - value = value.toString() - } - properties[key.toString()] = value + val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) } + val properties = HashMap() + for (key in artemisMessage.propertyNames) { + var value = artemisMessage.getObjectProperty(key) + if (value is SimpleString) { + value = value.toString() } - log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } - val peerInbox = translateLocalQueueToInboxAddress(queueName) - val sendableMessage = amqpClient.createMessage(data, peerInbox, - legalNames.first().toString(), - properties) - sendableMessage.onComplete.then { - log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" } - lock.withLock { - if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { - artemisMessage.acknowledge() - } else { - log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") - // We need to commit any acknowledged messages before rolling back the failed - // (unacknowledged) message. - session?.commit() - session?.rollback(false) - } - } - } - amqpClient.write(sendableMessage) + properties[key.toString()] = value } + log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } + val peerInbox = translateLocalQueueToInboxAddress(queueName) + val sendableMessage = amqpClient.createMessage(data, peerInbox, + legalNames.first().toString(), + properties) + sendableMessage.onComplete.then { + log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" } + lock.withLock { + if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { + artemisMessage.acknowledge() + } else { + log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") + // We need to commit any acknowledged messages before rolling back the failed + // (unacknowledged) message. + session?.commit() + session?.rollback(false) + } + } + } + amqpClient.write(sendableMessage) } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt index 2e1598d197..b554ed9c50 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt @@ -53,6 +53,7 @@ class NetworkBootstrapper { private const val LOGS_DIR_NAME = "logs" private const val WHITELIST_FILE_NAME = "whitelist.txt" + private const val EXCLUDE_WHITELIST_FILE_NAME = "exclude_whitelist.txt" @JvmStatic fun main(args: Array) { @@ -79,7 +80,7 @@ class NetworkBootstrapper { println("Gathering notary identities") val notaryInfos = gatherNotaryInfos(nodeInfoFiles) println("Notary identities to be used in network parameters: ${notaryInfos.joinToString("; ") { it.prettyPrint() }}") - val mergedWhiteList = generateWhitelist(directory / WHITELIST_FILE_NAME, cordapps?.distinct()) + val mergedWhiteList = generateWhitelist(directory / WHITELIST_FILE_NAME, directory / EXCLUDE_WHITELIST_FILE_NAME, cordapps?.distinct()) println("Updating whitelist") overwriteWhitelist(directory / WHITELIST_FILE_NAME, mergedWhiteList) installNetworkParameters(notaryInfos, nodeDirs, mergedWhiteList) @@ -187,21 +188,24 @@ class NetworkBootstrapper { nodeDirs.forEach { copier.install(it) } } - private fun generateWhitelist(whitelistFile: Path, cordapps: List?): Map> { + private fun generateWhitelist(whitelistFile: Path, excludeWhitelistFile: Path, cordapps: List?): Map> { val existingWhitelist = if (whitelistFile.exists()) readContractWhitelist(whitelistFile) else emptyMap() - println("Found existing whitelist:") - existingWhitelist.forEach { println(it.outputString()) } + println(if (existingWhitelist.isEmpty()) "No existing whitelist file found." else "Found existing whitelist: ${whitelistFile}") - val newWhiteList: Map = cordapps?.flatMap { cordappJarPath -> + val excludeContracts = if (excludeWhitelistFile.exists()) readExcludeWhitelist(excludeWhitelistFile) else emptyList() + if (excludeContracts.isNotEmpty()) { + println("Exclude contracts from whitelist: ${excludeContracts.joinToString()}}") + } + + val newWhiteList = cordapps?.flatMap { cordappJarPath -> val jarHash = getJarHash(cordappJarPath) scanJarForContracts(cordappJarPath).map { contract -> contract to jarHash } - }?.toMap() ?: emptyMap() + }?.filter { (contractClassName, _) -> contractClassName !in excludeContracts }?.toMap() ?: emptyMap() - println("Calculating whitelist for current CorDapps:") - newWhiteList.forEach { (contract, attachment) -> println("$contract:$attachment") } + println("Calculating whitelist for current installed CorDapps..") val merged = (newWhiteList.keys + existingWhitelist.keys).map { contractClassName -> val existing = existingWhitelist[contractClassName] ?: emptyList() @@ -209,15 +213,15 @@ class NetworkBootstrapper { contractClassName to (if (newHash == null || newHash in existing) existing else existing + newHash) }.toMap() - println("Final whitelist:") - merged.forEach { println(it.outputString()) } - + println("CorDapp whitelist " + (if (existingWhitelist.isEmpty()) "generated" else "updated") + " in ${whitelistFile}") return merged } private fun overwriteWhitelist(whitelistFile: Path, mergedWhiteList: Map>) { PrintStream(whitelistFile.toFile().outputStream()).use { out -> - mergedWhiteList.forEach { out.println(it.outputString()) } + mergedWhiteList.forEach { (contract, attachments) -> + out.println("${contract}:${attachments.joinToString(",")}") + } } } @@ -227,12 +231,14 @@ class NetworkBootstrapper { SecureHash.SHA256(hs.hash().asBytes()) } - private fun readContractWhitelist(file: Path): Map> = file.toFile().readLines() + private fun readContractWhitelist(file: Path): Map> = file.readAllLines() .map { line -> line.split(":") } .map { (contract, attachmentIds) -> contract to (attachmentIds.split(",").map(::parse)) }.toMap() + private fun readExcludeWhitelist(file: Path): List = file.readAllLines().map(String::trim) + private fun NotaryInfo.prettyPrint(): String = "${identity.name} (${if (validating) "" else "non-"}validating)" private fun NodeInfo.notaryIdentity(): Party { @@ -246,8 +252,6 @@ class NetworkBootstrapper { } } - private fun Map.Entry>.outputString() = "$key:${value.joinToString(",")}" - // We need to to set serialization env, because generation of parameters is run from Cordform. // KryoServerSerializationScheme is not accessible from nodeapi. private fun initialiseSerialization() { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt index aa939c5002..c18d7f4228 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt @@ -29,14 +29,7 @@ val KRYO_RPC_SERVER_CONTEXT = SerializationContextImpl(kryoMagic, true, SerializationContext.UseCase.RPCServer, null) -val KRYO_STORAGE_CONTEXT = SerializationContextImpl(kryoMagic, - SerializationDefaults.javaClass.classLoader, - AllButBlacklisted, - emptyMap(), - true, - SerializationContext.UseCase.Storage, - null, - AlwaysAcceptEncodingWhitelist) + val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic, SerializationDefaults.javaClass.classLoader, AllButBlacklisted, @@ -45,6 +38,7 @@ val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic, SerializationContext.UseCase.Storage, null, AlwaysAcceptEncodingWhitelist) + val AMQP_RPC_SERVER_CONTEXT = SerializationContextImpl(amqpMagic, SerializationDefaults.javaClass.classLoader, GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt index 854866a506..0a6d414bc0 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt @@ -17,14 +17,6 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic * CANNOT always be instantiated outside of the server and so * MUST be kept separate from these ones! */ - -val KRYO_P2P_CONTEXT = SerializationContextImpl(kryoMagic, - SerializationDefaults.javaClass.classLoader, - GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), - emptyMap(), - true, - SerializationContext.UseCase.P2P, - null) val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(kryoMagic, SerializationDefaults.javaClass.classLoader, QuasarWhitelist, @@ -33,6 +25,7 @@ val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(kryoMagic, SerializationContext.UseCase.Checkpoint, SNAPPY, AlwaysAcceptEncodingWhitelist) + val AMQP_P2P_CONTEXT = SerializationContextImpl(amqpMagic, SerializationDefaults.javaClass.classLoader, GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt index 8d07afcace..f099ce7af5 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt @@ -2,13 +2,16 @@ package net.corda.nodeapi.internal.serialization.amqp +import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner import net.corda.core.cordapp.Cordapp +import net.corda.core.internal.objectOrNewInstance import net.corda.core.serialization.* import net.corda.nodeapi.internal.serialization.CordaSerializationMagic import net.corda.core.utilities.ByteSequence import net.corda.nodeapi.internal.serialization.DefaultWhitelist import net.corda.nodeapi.internal.serialization.MutableClassWhitelist import net.corda.nodeapi.internal.serialization.SerializationScheme +import java.lang.reflect.Modifier import java.security.PublicKey import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -28,10 +31,20 @@ fun SerializerFactory.addToWhitelist(vararg types: Class<*>) { abstract class AbstractAMQPSerializationScheme(val cordappLoader: List) : SerializationScheme { + // TODO: This method of initialisation for the Whitelist and plugin serializers will have to change + // when we have per-cordapp contexts and dynamic app reloading but for now it's the easiest way companion object { private val serializationWhitelists: List by lazy { ServiceLoader.load(SerializationWhitelist::class.java, this::class.java.classLoader).toList() + DefaultWhitelist } + + private val customSerializers: List> by lazy { + FastClasspathScanner().addClassLoader(this::class.java.classLoader).scan() + .getNamesOfClassesImplementing(SerializationCustomSerializer::class.java) + .mapNotNull { this::class.java.classLoader.loadClass(it).asSubclass(SerializationCustomSerializer::class.java) } + .filterNot { Modifier.isAbstract(it.modifiers) } + .map { it.kotlin.objectOrNewInstance() } + } } private fun registerCustomSerializers(factory: SerializerFactory) { @@ -69,11 +82,20 @@ abstract class AbstractAMQPSerializationScheme(val cordappLoader: List) factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray()) } - for (loader in cordappLoader) { - for (schema in loader.serializationCustomSerializers) { - factory.registerExternal(CorDappCustomSerializer(schema, factory)) + // If we're passed in an external list we trust that, otherwise revert to looking at the scan of the + // classpath to find custom serializers. + if (cordappLoader.isEmpty()) { + for (customSerializer in customSerializers) { + factory.registerExternal(CorDappCustomSerializer(customSerializer, factory)) + } + } else { + cordappLoader.forEach { loader -> + for (customSerializer in loader.serializationCustomSerializers) { + factory.registerExternal(CorDappCustomSerializer(customSerializer, factory)) + } } } + } private val serializerFactoriesForContexts = ConcurrentHashMap, SerializerFactory>() diff --git a/node/build.gradle b/node/build.gradle index 2d4347d9bc..231f1ddf8b 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -105,6 +105,7 @@ dependencies { // Jackson support: serialisation to/from JSON, YAML, etc compile project(':client:jackson') + compile group: 'org.json', name: 'json', version: json_version // Coda Hale's Metrics: for monitoring of key statistics compile "io.dropwizard.metrics:metrics-core:3.1.2" diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 72b256eefc..03f23b039f 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -6,6 +6,10 @@ import net.corda.core.crypto.toStringShort import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.services.config.* +import net.corda.core.utilities.loggerFor +import net.corda.node.services.config.CertChainPolicyConfig +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent @@ -27,13 +31,14 @@ import java.util.* import kotlin.system.measureNanoTime import kotlin.system.measureTimeMillis import kotlin.test.assertEquals -import kotlin.test.assertNotEquals class AMQPBridgeTest { @Rule @JvmField val temporaryFolder = TemporaryFolder() + private val log = loggerFor() + private val ALICE = TestIdentity(ALICE_NAME) private val BOB = TestIdentity(BOB_NAME) @@ -68,14 +73,16 @@ class AMQPBridgeTest { //Create target server val amqpServer = createAMQPServer() + val dedupeSet = mutableSetOf() val receive = amqpServer.onReceive.toBlocking().iterator amqpServer.start() val receivedSequence = mutableListOf() + val atNodeSequence = mutableListOf() fun formatMessage(expected: String, actual: Int, received: List): String { - return "Expected message with id $expected, got $actual, previous message receive sequence: " + return "Expected message with id $expected, got $actual, previous message receive sequence: " + "${received.joinToString(", ", "[", "]")}." } @@ -83,66 +90,84 @@ class AMQPBridgeTest { val messageID1 = received1.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID1".toByteArray(), received1.payload) assertEquals(0, messageID1) + dedupeSet += received1.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String received1.complete(true) // Accept first message - receivedSequence.add(messageID1) + receivedSequence += messageID1 + atNodeSequence += messageID1 val received2 = receive.next() val messageID2 = received2.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID2".toByteArray(), received2.payload) assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence)) - received2.complete(false) // Reject message - receivedSequence.add(messageID2) + received2.complete(false) // Reject message and don't add to dedupe + receivedSequence += messageID2 // reflects actual sequence + // drop things until we get back to the replay while (true) { val received3 = receive.next() val messageID3 = received3.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID3".toByteArray(), received3.payload) - assertNotEquals(0, messageID3, formatMessage("< 1", messageID3, receivedSequence)) - receivedSequence.add(messageID3) + receivedSequence += messageID3 if (messageID3 != 1) { // keep rejecting any batched items following rejection received3.complete(false) } else { // beginnings of replay so accept again received3.complete(true) + val messageId = received3.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String + if (messageId !in dedupeSet) { + dedupeSet += messageId + atNodeSequence += messageID3 + } break } } + // start receiving again, but discarding duplicates while (true) { val received4 = receive.next() val messageID4 = received4.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID4".toByteArray(), received4.payload) - receivedSequence.add(messageID4) - if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip - assertEquals(2, messageID4) // next message should be in order though - break + receivedSequence += messageID4 + val messageId = received4.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String + if (messageId !in dedupeSet) { + dedupeSet += messageId + atNodeSequence += messageID4 } received4.complete(true) + if (messageID4 == 2) { // started to replay messages after rejection point + break + } } // Send a fresh item and check receive val artemisMessage = artemis.session.createMessage(true).apply { - putIntProperty("CountProp", -1) - writeBodyBufferBytes("Test_end".toByteArray()) + putIntProperty("CountProp", 3) + writeBodyBufferBytes("Test3".toByteArray()) // Use the magic deduplication property built into Artemis as our message identity too putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) } artemis.producer.send(sourceQueueName, artemisMessage) + // start receiving again, discarding duplicates while (true) { val received5 = receive.next() val messageID5 = received5.applicationProperties["CountProp"] as Int - if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip - assertEquals(-1, messageID5, formatMessage("-1", messageID5, receivedSequence)) // next message should be in order though - assertArrayEquals("Test_end".toByteArray(), received5.payload) - receivedSequence.add(messageID5) + assertArrayEquals("Test$messageID5".toByteArray(), received5.payload) + receivedSequence += messageID5 + val messageId = received5.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String + if (messageId !in dedupeSet) { + dedupeSet += messageId + atNodeSequence += messageID5 + } + received5.complete(true) + if (messageID5 == 3) { // reached our fresh message break } - receivedSequence.add(messageID5) - received5.complete(true) } - println("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}") + log.info("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}") + log.info("Deduped sequence: ${atNodeSequence.joinToString(", ", "[", "]")}") + assertEquals(listOf(0, 1, 2, 3), atNodeSequence) bridgeManager.stop() amqpServer.stop() artemisClient.stop() diff --git a/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt b/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt index 57fb06dee1..8cb160b908 100644 --- a/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt +++ b/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt @@ -1,6 +1,5 @@ package net.corda.node.shell -import com.fasterxml.jackson.core.JsonFactory import com.fasterxml.jackson.core.JsonGenerator import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.databind.* @@ -15,6 +14,7 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.UniqueIdentifier import net.corda.core.flows.FlowLogic import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party import net.corda.core.internal.* import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.openFuture @@ -22,7 +22,9 @@ import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.DataFeed import net.corda.core.messaging.FlowProgressHandle import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.node.NodeInfo import net.corda.core.node.services.IdentityService +import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.Node import net.corda.node.internal.StartedNode import net.corda.node.internal.security.AdminSubject @@ -52,6 +54,7 @@ import org.crsh.util.Utils import org.crsh.vfs.FS import org.crsh.vfs.spi.file.FileMountFactory import org.crsh.vfs.spi.url.ClassPathMountFactory +import org.json.JSONObject import org.slf4j.LoggerFactory import rx.Observable import rx.Subscriber @@ -196,13 +199,35 @@ object InteractiveShell { } } - private fun createOutputMapper(factory: JsonFactory): ObjectMapper { - return JacksonSupport.createNonRpcMapper(factory).apply { + private object NodeInfoSerializer : JsonSerializer() { + + override fun serialize(nodeInfo: NodeInfo, gen: JsonGenerator, serializers: SerializerProvider) { + + val json = JSONObject() + json["addresses"] = nodeInfo.addresses.map { address -> address.serialise() } + json["legalIdentities"] = nodeInfo.legalIdentities.map { address -> address.serialise() } + json["platformVersion"] = nodeInfo.platformVersion + json["serial"] = nodeInfo.serial + gen.writeRaw(json.toString()) + } + + private fun NetworkHostAndPort.serialise() = this.toString() + private fun Party.serialise() = JSONObject().put("name", this.name) + + private operator fun JSONObject.set(key: String, value: Any?): JSONObject { + return put(key, value) + } + } + + private fun createOutputMapper(): ObjectMapper { + + return JacksonSupport.createNonRpcMapper().apply { // Register serializers for stateful objects from libraries that are special to the RPC system and don't // make sense to print out to the screen. For classes we own, annotations can be used instead. val rpcModule = SimpleModule() rpcModule.addSerializer(Observable::class.java, ObservableSerializer) rpcModule.addSerializer(InputStream::class.java, InputStreamSerializer) + rpcModule.addSerializer(NodeInfo::class.java, NodeInfoSerializer) registerModule(rpcModule) disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) @@ -211,7 +236,7 @@ object InteractiveShell { } // TODO: This should become the default renderer rather than something used specifically by commands. - private val yamlMapper by lazy { createOutputMapper(YAMLFactory()) } + private val outputMapper by lazy { createOutputMapper() } /** * Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out @@ -394,11 +419,19 @@ object InteractiveShell { return result } - private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture { - val printerFun = yamlMapper::writeValueAsString - toStream.println(printerFun(response)) - toStream.flush() - return maybeFollow(response, printerFun, toStream) + private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter): CordaFuture { + + val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } + val mappingFunction: (Any?) -> String = { value -> + if (value is Collection<*>) { + value.joinToString(",${System.lineSeparator()} ", "[${System.lineSeparator()} ", "${System.lineSeparator()}]") { element -> + mapElement(element) + } + } else { + mapElement(value) + } + } + return maybeFollow(response, mappingFunction, out) } private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber() { @@ -421,6 +454,7 @@ object InteractiveShell { override fun onNext(t: Any?) { count++ toStream.println("Observation $count: " + printerFun(t)) + toStream.flush() } @Synchronized @@ -431,25 +465,32 @@ object InteractiveShell { } } - private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): CordaFuture { + private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture { // Match on a couple of common patterns for "important" observables. It's tough to do this in a generic // way because observables can be embedded anywhere in the object graph, and can emit other arbitrary // object graphs that contain yet more observables. So we just look for top level responses that follow // the standard "track" pattern, and print them until the user presses Ctrl-C if (response == null) return doneFuture(Unit) - val observable: Observable<*> = when (response) { - is Observable<*> -> response - is DataFeed<*, *> -> { - toStream.println("Snapshot") - toStream.println(response.snapshot) - response.updates - } - else -> return doneFuture(Unit) + if (response is DataFeed<*, *>) { + out.println("Snapshot:") + out.println(printerFun(response.snapshot)) + out.flush() + out.println("Updates:") + return printNextElements(response.updates, printerFun, out) + } + if (response is Observable<*>) { + return printNextElements(response, printerFun, out) } - val subscriber = PrintingSubscriber(printerFun, toStream) - uncheckedCast(observable).subscribe(subscriber) + out.println(printerFun(response)) + return doneFuture(Unit) + } + + private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture { + + val subscriber = PrintingSubscriber(printerFun, out) + uncheckedCast(elements).subscribe(subscriber) return subscriber.future } diff --git a/verify-enclave/src/main/kotlin/com/r3/enclaves/txverify/EnclaveletSerializationScheme.kt b/verify-enclave/src/main/kotlin/com/r3/enclaves/txverify/EnclaveletSerializationScheme.kt index 38c75114b3..99b546ba8d 100644 --- a/verify-enclave/src/main/kotlin/com/r3/enclaves/txverify/EnclaveletSerializationScheme.kt +++ b/verify-enclave/src/main/kotlin/com/r3/enclaves/txverify/EnclaveletSerializationScheme.kt @@ -6,7 +6,7 @@ import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.utilities.toHexString import net.corda.nodeapi.internal.serialization.CordaSerializationMagic -import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT +import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory @@ -27,10 +27,10 @@ private class EnclaveletSerializationScheme { registerScheme(AMQPVerifierSerializationScheme) }, /** - * Even though default context is set to Kryo P2P, the encoding will be adjusted depending on the + * Even though default context is set to Amqp P2P, the encoding will be adjusted depending on the * incoming request received. */ - KRYO_P2P_CONTEXT) + AMQP_P2P_CONTEXT) /* * Ensure that we initialise JAXP before blacklisting is enabled.