Merge remote-tracking branch 'open/master' into merge/tudor_network_bootstrap

# Conflicts:
#	node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt
This commit is contained in:
tudor.malene@gmail.com 2018-03-06 11:37:30 +00:00
commit 98a4fe1ce3
16 changed files with 227 additions and 147 deletions

View File

@ -42,6 +42,7 @@ buildscript {
ext.jackson_version = '2.9.3'
ext.jetty_version = '9.4.7.v20170914'
ext.jersey_version = '2.25'
ext.json_version = '20180130'
ext.assertj_version = '3.8.0'
ext.slf4j_version = '1.7.25'
ext.log4j_version = '2.9.1'

View File

@ -6,7 +6,7 @@ import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
import net.corda.core.serialization.internal.SerializationEnvironment
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme
@ -43,7 +43,7 @@ class KryoClientSerializationScheme : AbstractKryoSerializationScheme() {
registerScheme(KryoClientSerializationScheme())
registerScheme(AMQPClientSerializationScheme(emptyList()))
},
KRYO_P2P_CONTEXT,
AMQP_P2P_CONTEXT,
rpcClientContext = KRYO_RPC_CLIENT_CONTEXT)
}
}

View File

@ -12,14 +12,14 @@ import net.corda.core.utilities.unwrap
object IdentitySyncFlow {
/**
* Flow for ensuring that our counterparties in a transaction have the full certificate paths for *our* confidential
* identities used in states present in the transaction. This is intended for use as a subflow of another flow, typically between
* transaction assembly and signing. An example of where this is useful is where a recipient of a [Cash] state wants
* to know that it is being paid by the correct party, and the owner of the state is a confidential identity of that
* party. This flow would send a copy of the confidential identity path to the recipient, enabling them to verify that
* identity.
* Flow for ensuring that our counter-parties in a transaction have the full certificate paths for *our*
* confidential identities used in states present in the transaction. This is intended for use as a sub-flow of
* another flow, typically between transaction assembly and signing. An example of where this is useful is where
* a recipient of a state wants to know that it is being paid by the correct party, and the owner of the state is a
* confidential identity of that party. This flow would send a copy of the confidential identity path to the
* recipient, enabling them to verify that identity.
*/
// TODO: Can this be triggered automatically from [SendTransactionFlow]
// TODO: Can this be triggered automatically from [SendTransactionFlow]?
class Send(val otherSideSessions: Set<FlowSession>,
val tx: WireTransaction,
override val progressTracker: ProgressTracker) : FlowLogic<Unit>() {
@ -81,7 +81,7 @@ object IdentitySyncFlow {
override val progressTracker: ProgressTracker = ProgressTracker(RECEIVING_IDENTITIES, RECEIVING_CERTIFICATES)
@Suspendable
override fun call(): Unit {
override fun call() {
progressTracker.currentStep = RECEIVING_IDENTITIES
val allIdentities = otherSideSession.receive<List<AbstractParty>>().unwrap { it }
val unknownIdentities = allIdentities.filter { serviceHub.identityService.wellKnownPartyFromAnonymous(it) == null }

View File

@ -23,7 +23,7 @@ import java.util.*
/**
* Very basic flow which generates new confidential identities for parties in a transaction and exchanges the transaction
* key and certificate paths between the parties. This is intended for use as a subflow of another flow which builds a
* key and certificate paths between the parties. This is intended for use as a sub-flow of another flow which builds a
* transaction.
*/
@StartableByRPC
@ -38,7 +38,7 @@ class SwapIdentitiesFlow(private val otherParty: Party,
fun tracker() = ProgressTracker(AWAITING_KEY)
/**
* Generate the determinstic data blob the confidential identity's key holder signs to indicate they want to
* Generate the deterministic data blob the confidential identity's key holder signs to indicate they want to
* represent the subject named in the X.509 certificate. Note that this is never actually sent between nodes,
* but only the signature is sent. The blob is built independently on each node and the received signature
* verified against the expected blob, rather than exchanging the blob.
@ -64,7 +64,7 @@ class SwapIdentitiesFlow(private val otherParty: Party,
throw SwapIdentitiesException("Signature does not match the expected identity ownership assertion.", ex)
}
// Validate then store their identity so that we can prove the key in the transaction is owned by the
// counterparty.
// counter-party.
identityService.verifyAndRegisterIdentity(anonymousOtherSide)
return anonymousOtherSide
}
@ -90,8 +90,8 @@ class SwapIdentitiesFlow(private val otherParty: Party,
val confidentialIdentity: PartyAndCertificate = confidentialIdentityBytes.bytes.deserialize()
validateAndRegisterIdentity(serviceHub.identityService, otherParty, confidentialIdentity, theirSigBytes)
}
identities.put(ourIdentity, legalIdentityAnonymous.party.anonymise())
identities.put(otherParty, anonymousOtherSide.party.anonymise())
identities[ourIdentity] = legalIdentityAnonymous.party.anonymise()
identities[otherParty] = anonymousOtherSide.party.anonymise()
}
return identities
}
@ -101,9 +101,9 @@ class SwapIdentitiesFlow(private val otherParty: Party,
}
/**
* Data class used only in the context of asserting the owner of the private key for the listed key wants to use it
* to represent the named entity. This is pairs with an X.509 certificate (which asserts the signing identity says
* the key represents the named entity), but protects against a certificate authority incorrectly claiming others'
* Data class used only in the context of asserting that the owner of the private key for the listed key wants to use it
* to represent the named entity. This is paired with an X.509 certificate (which asserts the signing identity says
* the key represents the named entity) and protects against a malicious party incorrectly claiming others'
* keys.
*/
@CordaSerializable

View File

@ -182,6 +182,8 @@ class ProgressTracker(vararg steps: Step) {
fun endWithError(error: Throwable) {
check(!hasEnded) { "Progress tracker has already ended" }
_changes.onError(error)
_stepsTreeIndexChanges.onError(error)
_stepsTreeChanges.onError(error)
}
/** The parent of this tracker: set automatically by the parent when a tracker is added as a child */

View File

@ -6,7 +6,6 @@ import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.nodeapi.internal.serialization.KRYO_CHECKPOINT_CONTEXT
import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT
import net.corda.testing.core.SerializationEnvironmentRule
import org.assertj.core.api.Assertions.assertThat
import org.junit.Rule
@ -39,14 +38,6 @@ class KotlinUtilsTest {
assertThat(copy.transientVal).isEqualTo(copyVal)
}
@Test
fun `serialise transient property with non-capturing lambda`() {
expectedEx.expect(KryoException::class.java)
expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization")
val original = NonCapturingTransientProperty()
original.serialize(context = KRYO_P2P_CONTEXT)
}
@Test
fun `deserialise transient property with non-capturing lambda`() {
expectedEx.expect(KryoException::class.java)
@ -66,14 +57,6 @@ class KotlinUtilsTest {
assertThat(copy.transientVal).startsWith("Hello")
}
@Test
fun `serialise transient property with capturing lambda`() {
expectedEx.expect(KryoException::class.java)
expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization")
val original = CapturingTransientProperty("Hello")
original.serialize(context = KRYO_P2P_CONTEXT)
}
@Test
fun `deserialise transient property with capturing lambda`() {
expectedEx.expect(KryoException::class.java)

View File

@ -74,19 +74,16 @@ You can see an example of using ``SwapIdentitiesFlow`` in ``TwoPartyDealFlow.kt`
``SwapIdentitiesFlow`` goes through the following key steps:
1. Generate a nonce value to form a challenge to the other nodes
2. Send nonce value to all counterparties, and receive their nonce values
3. Generate a new confidential identity from our well-known identity
4. Create a data blob containing the new confidential identity (public key, name and X.509 certificate path),
and the hash of the nonce values
5. Sign the resulting data blob with the confidential identity's private key
6. Send the confidential identity and data blob signature to all counterparties, while receiving theirs
1. Generate a new confidential identity from our well-known identity
2. Create a ``CertificateOwnershipAssertion`` object containing the new confidential identity (X500 name, public key)
5. Sign this object with the confidential identity's private key
6. Send the confidential identity and aforementioned signature to counter-parties, while receiving theirs
7. Verify the signatures to ensure that identities were generated by the involved set of parties
8. Verify the confidential identities are owned by the expected well known identities
9. Store the confidential identities and return them to the calling flow
This ensures not only that the confidential identity X.509 certificates are signed by the correct well-known
identities, but also that the confidential identity private key is held by the counterparty, and that a party cannot
identities, but also that the confidential identity private key is held by the counter-party, and that a party cannot
claim ownership of another party's confidential identities.
IdentitySyncFlow

View File

@ -77,6 +77,25 @@ If IDEA refuses to open a project because an SDK has not been selected, you may
If you are having trouble selecting the correct JDK, the JetBrains website provides the `following guidelines <https://intellij-support.jetbrains.com/hc/en-us/articles/206544879-Selecting-the-JDK-version-the-IDE-will-run-under>`_.
IDEA complains about JVM target
*******************************
If you receive a ``Cannot inline bytecode build with JVM target 1.8 into bytecode...``, please ensure JDK is not
outdated and check that
.. parsed-literal::
Settings/Build, Execution, Deployment/Compiler/Kotlin Compiler/Target JVM Version=1.8
..
IDEA red-lining - Unresolved reference: function
************************************************
If you are running under an outdated SDK, IntelliJ will not complain about lack of an SDK, but you might notice
some functions are red-lined and compilation fails. In this case you should update you SDK, see JetBrains website
`following guidelines <https://intellij-support.jetbrains.com/hc/en-us/articles/206544879-Selecting-the-JDK-version-the-IDE-will-run-under>`_.
IDEA fails to compile Corda because it refuses to find some dependencies
************************************************************************
@ -100,7 +119,7 @@ or checking the
.. parsed-literal::
Settings/Build,Execution,Deployment/Build Tools/Gradle/Runner/Delegate IDE build-run actions to gradle
Settings/Build, Execution, Deployment/Build Tools/Gradle/Runner/Delegate IDE build-run actions to gradle
..

View File

@ -126,37 +126,35 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
}
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
lock.withLock {
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
val properties = HashMap<Any?, Any?>()
for (key in artemisMessage.propertyNames) {
var value = artemisMessage.getObjectProperty(key)
if (value is SimpleString) {
value = value.toString()
}
properties[key.toString()] = value
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
val properties = HashMap<Any?, Any?>()
for (key in artemisMessage.propertyNames) {
var value = artemisMessage.getObjectProperty(key)
if (value is SimpleString) {
value = value.toString()
}
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
val peerInbox = translateLocalQueueToInboxAddress(queueName)
val sendableMessage = amqpClient.createMessage(data, peerInbox,
legalNames.first().toString(),
properties)
sendableMessage.onComplete.then {
log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" }
lock.withLock {
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
artemisMessage.acknowledge()
} else {
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
// We need to commit any acknowledged messages before rolling back the failed
// (unacknowledged) message.
session?.commit()
session?.rollback(false)
}
}
}
amqpClient.write(sendableMessage)
properties[key.toString()] = value
}
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
val peerInbox = translateLocalQueueToInboxAddress(queueName)
val sendableMessage = amqpClient.createMessage(data, peerInbox,
legalNames.first().toString(),
properties)
sendableMessage.onComplete.then {
log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" }
lock.withLock {
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
artemisMessage.acknowledge()
} else {
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
// We need to commit any acknowledged messages before rolling back the failed
// (unacknowledged) message.
session?.commit()
session?.rollback(false)
}
}
}
amqpClient.write(sendableMessage)
}
}

View File

@ -53,6 +53,7 @@ class NetworkBootstrapper {
private const val LOGS_DIR_NAME = "logs"
private const val WHITELIST_FILE_NAME = "whitelist.txt"
private const val EXCLUDE_WHITELIST_FILE_NAME = "exclude_whitelist.txt"
@JvmStatic
fun main(args: Array<String>) {
@ -79,7 +80,7 @@ class NetworkBootstrapper {
println("Gathering notary identities")
val notaryInfos = gatherNotaryInfos(nodeInfoFiles)
println("Notary identities to be used in network parameters: ${notaryInfos.joinToString("; ") { it.prettyPrint() }}")
val mergedWhiteList = generateWhitelist(directory / WHITELIST_FILE_NAME, cordapps?.distinct())
val mergedWhiteList = generateWhitelist(directory / WHITELIST_FILE_NAME, directory / EXCLUDE_WHITELIST_FILE_NAME, cordapps?.distinct())
println("Updating whitelist")
overwriteWhitelist(directory / WHITELIST_FILE_NAME, mergedWhiteList)
installNetworkParameters(notaryInfos, nodeDirs, mergedWhiteList)
@ -187,21 +188,24 @@ class NetworkBootstrapper {
nodeDirs.forEach { copier.install(it) }
}
private fun generateWhitelist(whitelistFile: Path, cordapps: List<String>?): Map<String, List<AttachmentId>> {
private fun generateWhitelist(whitelistFile: Path, excludeWhitelistFile: Path, cordapps: List<String>?): Map<String, List<AttachmentId>> {
val existingWhitelist = if (whitelistFile.exists()) readContractWhitelist(whitelistFile) else emptyMap()
println("Found existing whitelist:")
existingWhitelist.forEach { println(it.outputString()) }
println(if (existingWhitelist.isEmpty()) "No existing whitelist file found." else "Found existing whitelist: ${whitelistFile}")
val newWhiteList: Map<ContractClassName, AttachmentId> = cordapps?.flatMap { cordappJarPath ->
val excludeContracts = if (excludeWhitelistFile.exists()) readExcludeWhitelist(excludeWhitelistFile) else emptyList()
if (excludeContracts.isNotEmpty()) {
println("Exclude contracts from whitelist: ${excludeContracts.joinToString()}}")
}
val newWhiteList = cordapps?.flatMap { cordappJarPath ->
val jarHash = getJarHash(cordappJarPath)
scanJarForContracts(cordappJarPath).map { contract ->
contract to jarHash
}
}?.toMap() ?: emptyMap()
}?.filter { (contractClassName, _) -> contractClassName !in excludeContracts }?.toMap() ?: emptyMap()
println("Calculating whitelist for current CorDapps:")
newWhiteList.forEach { (contract, attachment) -> println("$contract:$attachment") }
println("Calculating whitelist for current installed CorDapps..")
val merged = (newWhiteList.keys + existingWhitelist.keys).map { contractClassName ->
val existing = existingWhitelist[contractClassName] ?: emptyList()
@ -209,15 +213,15 @@ class NetworkBootstrapper {
contractClassName to (if (newHash == null || newHash in existing) existing else existing + newHash)
}.toMap()
println("Final whitelist:")
merged.forEach { println(it.outputString()) }
println("CorDapp whitelist " + (if (existingWhitelist.isEmpty()) "generated" else "updated") + " in ${whitelistFile}")
return merged
}
private fun overwriteWhitelist(whitelistFile: Path, mergedWhiteList: Map<String, List<AttachmentId>>) {
PrintStream(whitelistFile.toFile().outputStream()).use { out ->
mergedWhiteList.forEach { out.println(it.outputString()) }
mergedWhiteList.forEach { (contract, attachments) ->
out.println("${contract}:${attachments.joinToString(",")}")
}
}
}
@ -227,12 +231,14 @@ class NetworkBootstrapper {
SecureHash.SHA256(hs.hash().asBytes())
}
private fun readContractWhitelist(file: Path): Map<String, List<AttachmentId>> = file.toFile().readLines()
private fun readContractWhitelist(file: Path): Map<String, List<AttachmentId>> = file.readAllLines()
.map { line -> line.split(":") }
.map { (contract, attachmentIds) ->
contract to (attachmentIds.split(",").map(::parse))
}.toMap()
private fun readExcludeWhitelist(file: Path): List<String> = file.readAllLines().map(String::trim)
private fun NotaryInfo.prettyPrint(): String = "${identity.name} (${if (validating) "" else "non-"}validating)"
private fun NodeInfo.notaryIdentity(): Party {
@ -246,8 +252,6 @@ class NetworkBootstrapper {
}
}
private fun Map.Entry<ContractClassName, List<AttachmentId>>.outputString() = "$key:${value.joinToString(",")}"
// We need to to set serialization env, because generation of parameters is run from Cordform.
// KryoServerSerializationScheme is not accessible from nodeapi.
private fun initialiseSerialization() {

View File

@ -29,14 +29,7 @@ val KRYO_RPC_SERVER_CONTEXT = SerializationContextImpl(kryoMagic,
true,
SerializationContext.UseCase.RPCServer,
null)
val KRYO_STORAGE_CONTEXT = SerializationContextImpl(kryoMagic,
SerializationDefaults.javaClass.classLoader,
AllButBlacklisted,
emptyMap(),
true,
SerializationContext.UseCase.Storage,
null,
AlwaysAcceptEncodingWhitelist)
val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic,
SerializationDefaults.javaClass.classLoader,
AllButBlacklisted,
@ -45,6 +38,7 @@ val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic,
SerializationContext.UseCase.Storage,
null,
AlwaysAcceptEncodingWhitelist)
val AMQP_RPC_SERVER_CONTEXT = SerializationContextImpl(amqpMagic,
SerializationDefaults.javaClass.classLoader,
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),

View File

@ -16,14 +16,6 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
* CANNOT always be instantiated outside of the server and so
* MUST be kept separate from these ones!
*/
val KRYO_P2P_CONTEXT = SerializationContextImpl(kryoMagic,
SerializationDefaults.javaClass.classLoader,
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
emptyMap(),
true,
SerializationContext.UseCase.P2P,
null)
val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(kryoMagic,
SerializationDefaults.javaClass.classLoader,
QuasarWhitelist,
@ -32,6 +24,7 @@ val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(kryoMagic,
SerializationContext.UseCase.Checkpoint,
null,
AlwaysAcceptEncodingWhitelist)
val AMQP_P2P_CONTEXT = SerializationContextImpl(amqpMagic,
SerializationDefaults.javaClass.classLoader,
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),

View File

@ -2,13 +2,16 @@
package net.corda.nodeapi.internal.serialization.amqp
import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner
import net.corda.core.cordapp.Cordapp
import net.corda.core.internal.objectOrNewInstance
import net.corda.core.serialization.*
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
import net.corda.core.utilities.ByteSequence
import net.corda.nodeapi.internal.serialization.DefaultWhitelist
import net.corda.nodeapi.internal.serialization.MutableClassWhitelist
import net.corda.nodeapi.internal.serialization.SerializationScheme
import java.lang.reflect.Modifier
import java.security.PublicKey
import java.util.*
import java.util.concurrent.ConcurrentHashMap
@ -28,10 +31,20 @@ fun SerializerFactory.addToWhitelist(vararg types: Class<*>) {
abstract class AbstractAMQPSerializationScheme(val cordappLoader: List<Cordapp>) : SerializationScheme {
// TODO: This method of initialisation for the Whitelist and plugin serializers will have to change
// when we have per-cordapp contexts and dynamic app reloading but for now it's the easiest way
companion object {
private val serializationWhitelists: List<SerializationWhitelist> by lazy {
ServiceLoader.load(SerializationWhitelist::class.java, this::class.java.classLoader).toList() + DefaultWhitelist
}
private val customSerializers: List<SerializationCustomSerializer<*, *>> by lazy {
FastClasspathScanner().addClassLoader(this::class.java.classLoader).scan()
.getNamesOfClassesImplementing(SerializationCustomSerializer::class.java)
.mapNotNull { this::class.java.classLoader.loadClass(it).asSubclass(SerializationCustomSerializer::class.java) }
.filterNot { Modifier.isAbstract(it.modifiers) }
.map { it.kotlin.objectOrNewInstance() }
}
}
private fun registerCustomSerializers(factory: SerializerFactory) {
@ -69,11 +82,20 @@ abstract class AbstractAMQPSerializationScheme(val cordappLoader: List<Cordapp>)
factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray())
}
for (loader in cordappLoader) {
for (schema in loader.serializationCustomSerializers) {
factory.registerExternal(CorDappCustomSerializer(schema, factory))
// If we're passed in an external list we trust that, otherwise revert to looking at the scan of the
// classpath to find custom serializers.
if (cordappLoader.isEmpty()) {
for (customSerializer in customSerializers) {
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
}
} else {
cordappLoader.forEach { loader ->
for (customSerializer in loader.serializationCustomSerializers) {
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
}
}
}
}
private val serializerFactoriesForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>()

View File

@ -105,6 +105,7 @@ dependencies {
// Jackson support: serialisation to/from JSON, YAML, etc
compile project(':client:jackson')
compile group: 'org.json', name: 'json', version: json_version
// Coda Hale's Metrics: for monitoring of key statistics
compile "io.dropwizard.metrics:metrics-core:3.1.2"

View File

@ -6,6 +6,10 @@ import net.corda.core.crypto.toStringShort
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.*
import net.corda.core.utilities.loggerFor
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent
@ -27,13 +31,14 @@ import java.util.*
import kotlin.system.measureNanoTime
import kotlin.system.measureTimeMillis
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
class AMQPBridgeTest {
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
private val log = loggerFor<AMQPBridgeTest>()
private val ALICE = TestIdentity(ALICE_NAME)
private val BOB = TestIdentity(BOB_NAME)
@ -68,14 +73,16 @@ class AMQPBridgeTest {
//Create target server
val amqpServer = createAMQPServer()
val dedupeSet = mutableSetOf<String>()
val receive = amqpServer.onReceive.toBlocking().iterator
amqpServer.start()
val receivedSequence = mutableListOf<Int>()
val atNodeSequence = mutableListOf<Int>()
fun formatMessage(expected: String, actual: Int, received: List<Int>): String {
return "Expected message with id $expected, got $actual, previous message receive sequence: "
return "Expected message with id $expected, got $actual, previous message receive sequence: " +
"${received.joinToString(", ", "[", "]")}."
}
@ -83,66 +90,84 @@ class AMQPBridgeTest {
val messageID1 = received1.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID1".toByteArray(), received1.payload)
assertEquals(0, messageID1)
dedupeSet += received1.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
received1.complete(true) // Accept first message
receivedSequence.add(messageID1)
receivedSequence += messageID1
atNodeSequence += messageID1
val received2 = receive.next()
val messageID2 = received2.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID2".toByteArray(), received2.payload)
assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence))
received2.complete(false) // Reject message
receivedSequence.add(messageID2)
received2.complete(false) // Reject message and don't add to dedupe
receivedSequence += messageID2 // reflects actual sequence
// drop things until we get back to the replay
while (true) {
val received3 = receive.next()
val messageID3 = received3.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID3".toByteArray(), received3.payload)
assertNotEquals(0, messageID3, formatMessage("< 1", messageID3, receivedSequence))
receivedSequence.add(messageID3)
receivedSequence += messageID3
if (messageID3 != 1) { // keep rejecting any batched items following rejection
received3.complete(false)
} else { // beginnings of replay so accept again
received3.complete(true)
val messageId = received3.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
if (messageId !in dedupeSet) {
dedupeSet += messageId
atNodeSequence += messageID3
}
break
}
}
// start receiving again, but discarding duplicates
while (true) {
val received4 = receive.next()
val messageID4 = received4.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID4".toByteArray(), received4.payload)
receivedSequence.add(messageID4)
if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip
assertEquals(2, messageID4) // next message should be in order though
break
receivedSequence += messageID4
val messageId = received4.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
if (messageId !in dedupeSet) {
dedupeSet += messageId
atNodeSequence += messageID4
}
received4.complete(true)
if (messageID4 == 2) { // started to replay messages after rejection point
break
}
}
// Send a fresh item and check receive
val artemisMessage = artemis.session.createMessage(true).apply {
putIntProperty("CountProp", -1)
writeBodyBufferBytes("Test_end".toByteArray())
putIntProperty("CountProp", 3)
writeBodyBufferBytes("Test3".toByteArray())
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
artemis.producer.send(sourceQueueName, artemisMessage)
// start receiving again, discarding duplicates
while (true) {
val received5 = receive.next()
val messageID5 = received5.applicationProperties["CountProp"] as Int
if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip
assertEquals(-1, messageID5, formatMessage("-1", messageID5, receivedSequence)) // next message should be in order though
assertArrayEquals("Test_end".toByteArray(), received5.payload)
receivedSequence.add(messageID5)
assertArrayEquals("Test$messageID5".toByteArray(), received5.payload)
receivedSequence += messageID5
val messageId = received5.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
if (messageId !in dedupeSet) {
dedupeSet += messageId
atNodeSequence += messageID5
}
received5.complete(true)
if (messageID5 == 3) { // reached our fresh message
break
}
receivedSequence.add(messageID5)
received5.complete(true)
}
println("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}")
log.info("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}")
log.info("Deduped sequence: ${atNodeSequence.joinToString(", ", "[", "]")}")
assertEquals(listOf(0, 1, 2, 3), atNodeSequence)
bridgeManager.stop()
amqpServer.stop()
artemisClient.stop()

View File

@ -1,6 +1,5 @@
package net.corda.node.shell
import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.*
@ -15,6 +14,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
@ -22,7 +22,9 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.internal.security.AdminSubject
@ -52,6 +54,7 @@ import org.crsh.util.Utils
import org.crsh.vfs.FS
import org.crsh.vfs.spi.file.FileMountFactory
import org.crsh.vfs.spi.url.ClassPathMountFactory
import org.json.JSONObject
import org.slf4j.LoggerFactory
import rx.Observable
import rx.Subscriber
@ -196,13 +199,35 @@ object InteractiveShell {
}
}
private fun createOutputMapper(factory: JsonFactory): ObjectMapper {
return JacksonSupport.createNonRpcMapper(factory).apply {
private object NodeInfoSerializer : JsonSerializer<NodeInfo>() {
override fun serialize(nodeInfo: NodeInfo, gen: JsonGenerator, serializers: SerializerProvider) {
val json = JSONObject()
json["addresses"] = nodeInfo.addresses.map { address -> address.serialise() }
json["legalIdentities"] = nodeInfo.legalIdentities.map { address -> address.serialise() }
json["platformVersion"] = nodeInfo.platformVersion
json["serial"] = nodeInfo.serial
gen.writeRaw(json.toString())
}
private fun NetworkHostAndPort.serialise() = this.toString()
private fun Party.serialise() = JSONObject().put("name", this.name)
private operator fun JSONObject.set(key: String, value: Any?): JSONObject {
return put(key, value)
}
}
private fun createOutputMapper(): ObjectMapper {
return JacksonSupport.createNonRpcMapper().apply {
// Register serializers for stateful objects from libraries that are special to the RPC system and don't
// make sense to print out to the screen. For classes we own, annotations can be used instead.
val rpcModule = SimpleModule()
rpcModule.addSerializer(Observable::class.java, ObservableSerializer)
rpcModule.addSerializer(InputStream::class.java, InputStreamSerializer)
rpcModule.addSerializer(NodeInfo::class.java, NodeInfoSerializer)
registerModule(rpcModule)
disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
@ -211,7 +236,7 @@ object InteractiveShell {
}
// TODO: This should become the default renderer rather than something used specifically by commands.
private val yamlMapper by lazy { createOutputMapper(YAMLFactory()) }
private val outputMapper by lazy { createOutputMapper() }
/**
* Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out
@ -394,11 +419,19 @@ object InteractiveShell {
return result
}
private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture<Unit> {
val printerFun = yamlMapper::writeValueAsString
toStream.println(printerFun(response))
toStream.flush()
return maybeFollow(response, printerFun, toStream)
private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter): CordaFuture<Unit> {
val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) }
val mappingFunction: (Any?) -> String = { value ->
if (value is Collection<*>) {
value.joinToString(",${System.lineSeparator()} ", "[${System.lineSeparator()} ", "${System.lineSeparator()}]") { element ->
mapElement(element)
}
} else {
mapElement(value)
}
}
return maybeFollow(response, mappingFunction, out)
}
private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() {
@ -421,6 +454,7 @@ object InteractiveShell {
override fun onNext(t: Any?) {
count++
toStream.println("Observation $count: " + printerFun(t))
toStream.flush()
}
@Synchronized
@ -431,25 +465,32 @@ object InteractiveShell {
}
}
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): CordaFuture<Unit> {
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> {
// Match on a couple of common patterns for "important" observables. It's tough to do this in a generic
// way because observables can be embedded anywhere in the object graph, and can emit other arbitrary
// object graphs that contain yet more observables. So we just look for top level responses that follow
// the standard "track" pattern, and print them until the user presses Ctrl-C
if (response == null) return doneFuture(Unit)
val observable: Observable<*> = when (response) {
is Observable<*> -> response
is DataFeed<*, *> -> {
toStream.println("Snapshot")
toStream.println(response.snapshot)
response.updates
}
else -> return doneFuture(Unit)
if (response is DataFeed<*, *>) {
out.println("Snapshot:")
out.println(printerFun(response.snapshot))
out.flush()
out.println("Updates:")
return printNextElements(response.updates, printerFun, out)
}
if (response is Observable<*>) {
return printNextElements(response, printerFun, out)
}
val subscriber = PrintingSubscriber(printerFun, toStream)
uncheckedCast(observable).subscribe(subscriber)
out.println(printerFun(response))
return doneFuture(Unit)
}
private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> {
val subscriber = PrintingSubscriber(printerFun, out)
uncheckedCast(elements).subscribe(subscriber)
return subscriber.future
}