mirror of
https://github.com/corda/corda.git
synced 2025-01-14 00:39:57 +00:00
Merge pull request #515 from corda/merge/tudor_network_bootstrap
Merge/tudor network bootstrap
This commit is contained in:
commit
ad4bed779d
@ -42,6 +42,7 @@ buildscript {
|
||||
ext.jackson_version = '2.9.3'
|
||||
ext.jetty_version = '9.4.7.v20170914'
|
||||
ext.jersey_version = '2.25'
|
||||
ext.json_version = '20180130'
|
||||
ext.assertj_version = '3.8.0'
|
||||
ext.slf4j_version = '1.7.25'
|
||||
ext.log4j_version = '2.9.1'
|
||||
|
@ -6,7 +6,7 @@ import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
||||
import net.corda.core.serialization.internal.SerializationEnvironment
|
||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT
|
||||
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
||||
import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT
|
||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
@ -43,7 +43,7 @@ class KryoClientSerializationScheme : AbstractKryoSerializationScheme() {
|
||||
registerScheme(KryoClientSerializationScheme())
|
||||
registerScheme(AMQPClientSerializationScheme(emptyList()))
|
||||
},
|
||||
KRYO_P2P_CONTEXT,
|
||||
AMQP_P2P_CONTEXT,
|
||||
rpcClientContext = KRYO_RPC_CLIENT_CONTEXT)
|
||||
}
|
||||
}
|
||||
|
@ -12,14 +12,14 @@ import net.corda.core.utilities.unwrap
|
||||
|
||||
object IdentitySyncFlow {
|
||||
/**
|
||||
* Flow for ensuring that our counterparties in a transaction have the full certificate paths for *our* confidential
|
||||
* identities used in states present in the transaction. This is intended for use as a subflow of another flow, typically between
|
||||
* transaction assembly and signing. An example of where this is useful is where a recipient of a [Cash] state wants
|
||||
* to know that it is being paid by the correct party, and the owner of the state is a confidential identity of that
|
||||
* party. This flow would send a copy of the confidential identity path to the recipient, enabling them to verify that
|
||||
* identity.
|
||||
* Flow for ensuring that our counter-parties in a transaction have the full certificate paths for *our*
|
||||
* confidential identities used in states present in the transaction. This is intended for use as a sub-flow of
|
||||
* another flow, typically between transaction assembly and signing. An example of where this is useful is where
|
||||
* a recipient of a state wants to know that it is being paid by the correct party, and the owner of the state is a
|
||||
* confidential identity of that party. This flow would send a copy of the confidential identity path to the
|
||||
* recipient, enabling them to verify that identity.
|
||||
*/
|
||||
// TODO: Can this be triggered automatically from [SendTransactionFlow]
|
||||
// TODO: Can this be triggered automatically from [SendTransactionFlow]?
|
||||
class Send(val otherSideSessions: Set<FlowSession>,
|
||||
val tx: WireTransaction,
|
||||
override val progressTracker: ProgressTracker) : FlowLogic<Unit>() {
|
||||
@ -81,7 +81,7 @@ object IdentitySyncFlow {
|
||||
override val progressTracker: ProgressTracker = ProgressTracker(RECEIVING_IDENTITIES, RECEIVING_CERTIFICATES)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Unit {
|
||||
override fun call() {
|
||||
progressTracker.currentStep = RECEIVING_IDENTITIES
|
||||
val allIdentities = otherSideSession.receive<List<AbstractParty>>().unwrap { it }
|
||||
val unknownIdentities = allIdentities.filter { serviceHub.identityService.wellKnownPartyFromAnonymous(it) == null }
|
||||
|
@ -23,7 +23,7 @@ import java.util.*
|
||||
|
||||
/**
|
||||
* Very basic flow which generates new confidential identities for parties in a transaction and exchanges the transaction
|
||||
* key and certificate paths between the parties. This is intended for use as a subflow of another flow which builds a
|
||||
* key and certificate paths between the parties. This is intended for use as a sub-flow of another flow which builds a
|
||||
* transaction.
|
||||
*/
|
||||
@StartableByRPC
|
||||
@ -38,7 +38,7 @@ class SwapIdentitiesFlow(private val otherParty: Party,
|
||||
|
||||
fun tracker() = ProgressTracker(AWAITING_KEY)
|
||||
/**
|
||||
* Generate the determinstic data blob the confidential identity's key holder signs to indicate they want to
|
||||
* Generate the deterministic data blob the confidential identity's key holder signs to indicate they want to
|
||||
* represent the subject named in the X.509 certificate. Note that this is never actually sent between nodes,
|
||||
* but only the signature is sent. The blob is built independently on each node and the received signature
|
||||
* verified against the expected blob, rather than exchanging the blob.
|
||||
@ -64,7 +64,7 @@ class SwapIdentitiesFlow(private val otherParty: Party,
|
||||
throw SwapIdentitiesException("Signature does not match the expected identity ownership assertion.", ex)
|
||||
}
|
||||
// Validate then store their identity so that we can prove the key in the transaction is owned by the
|
||||
// counterparty.
|
||||
// counter-party.
|
||||
identityService.verifyAndRegisterIdentity(anonymousOtherSide)
|
||||
return anonymousOtherSide
|
||||
}
|
||||
@ -90,8 +90,8 @@ class SwapIdentitiesFlow(private val otherParty: Party,
|
||||
val confidentialIdentity: PartyAndCertificate = confidentialIdentityBytes.bytes.deserialize()
|
||||
validateAndRegisterIdentity(serviceHub.identityService, otherParty, confidentialIdentity, theirSigBytes)
|
||||
}
|
||||
identities.put(ourIdentity, legalIdentityAnonymous.party.anonymise())
|
||||
identities.put(otherParty, anonymousOtherSide.party.anonymise())
|
||||
identities[ourIdentity] = legalIdentityAnonymous.party.anonymise()
|
||||
identities[otherParty] = anonymousOtherSide.party.anonymise()
|
||||
}
|
||||
return identities
|
||||
}
|
||||
@ -101,9 +101,9 @@ class SwapIdentitiesFlow(private val otherParty: Party,
|
||||
}
|
||||
|
||||
/**
|
||||
* Data class used only in the context of asserting the owner of the private key for the listed key wants to use it
|
||||
* to represent the named entity. This is pairs with an X.509 certificate (which asserts the signing identity says
|
||||
* the key represents the named entity), but protects against a certificate authority incorrectly claiming others'
|
||||
* Data class used only in the context of asserting that the owner of the private key for the listed key wants to use it
|
||||
* to represent the named entity. This is paired with an X.509 certificate (which asserts the signing identity says
|
||||
* the key represents the named entity) and protects against a malicious party incorrectly claiming others'
|
||||
* keys.
|
||||
*/
|
||||
@CordaSerializable
|
||||
|
@ -182,6 +182,8 @@ class ProgressTracker(vararg steps: Step) {
|
||||
fun endWithError(error: Throwable) {
|
||||
check(!hasEnded) { "Progress tracker has already ended" }
|
||||
_changes.onError(error)
|
||||
_stepsTreeIndexChanges.onError(error)
|
||||
_stepsTreeChanges.onError(error)
|
||||
}
|
||||
|
||||
/** The parent of this tracker: set automatically by the parent when a tracker is added as a child */
|
||||
|
@ -6,7 +6,6 @@ import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.nodeapi.internal.serialization.KRYO_CHECKPOINT_CONTEXT
|
||||
import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Rule
|
||||
@ -39,14 +38,6 @@ class KotlinUtilsTest {
|
||||
assertThat(copy.transientVal).isEqualTo(copyVal)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `serialise transient property with non-capturing lambda`() {
|
||||
expectedEx.expect(KryoException::class.java)
|
||||
expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization")
|
||||
val original = NonCapturingTransientProperty()
|
||||
original.serialize(context = KRYO_P2P_CONTEXT)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `deserialise transient property with non-capturing lambda`() {
|
||||
expectedEx.expect(KryoException::class.java)
|
||||
@ -66,14 +57,6 @@ class KotlinUtilsTest {
|
||||
assertThat(copy.transientVal).startsWith("Hello")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `serialise transient property with capturing lambda`() {
|
||||
expectedEx.expect(KryoException::class.java)
|
||||
expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization")
|
||||
val original = CapturingTransientProperty("Hello")
|
||||
original.serialize(context = KRYO_P2P_CONTEXT)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `deserialise transient property with capturing lambda`() {
|
||||
expectedEx.expect(KryoException::class.java)
|
||||
|
@ -74,19 +74,16 @@ You can see an example of using ``SwapIdentitiesFlow`` in ``TwoPartyDealFlow.kt`
|
||||
|
||||
``SwapIdentitiesFlow`` goes through the following key steps:
|
||||
|
||||
1. Generate a nonce value to form a challenge to the other nodes
|
||||
2. Send nonce value to all counterparties, and receive their nonce values
|
||||
3. Generate a new confidential identity from our well-known identity
|
||||
4. Create a data blob containing the new confidential identity (public key, name and X.509 certificate path),
|
||||
and the hash of the nonce values
|
||||
5. Sign the resulting data blob with the confidential identity's private key
|
||||
6. Send the confidential identity and data blob signature to all counterparties, while receiving theirs
|
||||
1. Generate a new confidential identity from our well-known identity
|
||||
2. Create a ``CertificateOwnershipAssertion`` object containing the new confidential identity (X500 name, public key)
|
||||
5. Sign this object with the confidential identity's private key
|
||||
6. Send the confidential identity and aforementioned signature to counter-parties, while receiving theirs
|
||||
7. Verify the signatures to ensure that identities were generated by the involved set of parties
|
||||
8. Verify the confidential identities are owned by the expected well known identities
|
||||
9. Store the confidential identities and return them to the calling flow
|
||||
|
||||
This ensures not only that the confidential identity X.509 certificates are signed by the correct well-known
|
||||
identities, but also that the confidential identity private key is held by the counterparty, and that a party cannot
|
||||
identities, but also that the confidential identity private key is held by the counter-party, and that a party cannot
|
||||
claim ownership of another party's confidential identities.
|
||||
|
||||
IdentitySyncFlow
|
||||
|
@ -77,6 +77,25 @@ If IDEA refuses to open a project because an SDK has not been selected, you may
|
||||
|
||||
If you are having trouble selecting the correct JDK, the JetBrains website provides the `following guidelines <https://intellij-support.jetbrains.com/hc/en-us/articles/206544879-Selecting-the-JDK-version-the-IDE-will-run-under>`_.
|
||||
|
||||
IDEA complains about JVM target
|
||||
*******************************
|
||||
|
||||
If you receive a ``Cannot inline bytecode build with JVM target 1.8 into bytecode...``, please ensure JDK is not
|
||||
outdated and check that
|
||||
|
||||
.. parsed-literal::
|
||||
|
||||
Settings/Build, Execution, Deployment/Compiler/Kotlin Compiler/Target JVM Version=1.8
|
||||
|
||||
..
|
||||
|
||||
IDEA red-lining - Unresolved reference: function
|
||||
************************************************
|
||||
|
||||
If you are running under an outdated SDK, IntelliJ will not complain about lack of an SDK, but you might notice
|
||||
some functions are red-lined and compilation fails. In this case you should update you SDK, see JetBrains website
|
||||
`following guidelines <https://intellij-support.jetbrains.com/hc/en-us/articles/206544879-Selecting-the-JDK-version-the-IDE-will-run-under>`_.
|
||||
|
||||
IDEA fails to compile Corda because it refuses to find some dependencies
|
||||
************************************************************************
|
||||
|
||||
@ -100,7 +119,7 @@ or checking the
|
||||
|
||||
.. parsed-literal::
|
||||
|
||||
Settings/Build,Execution,Deployment/Build Tools/Gradle/Runner/Delegate IDE build-run actions to gradle
|
||||
Settings/Build, Execution, Deployment/Build Tools/Gradle/Runner/Delegate IDE build-run actions to gradle
|
||||
|
||||
..
|
||||
|
||||
|
@ -126,37 +126,35 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
|
||||
}
|
||||
|
||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
lock.withLock {
|
||||
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
||||
val properties = HashMap<Any?, Any?>()
|
||||
for (key in artemisMessage.propertyNames) {
|
||||
var value = artemisMessage.getObjectProperty(key)
|
||||
if (value is SimpleString) {
|
||||
value = value.toString()
|
||||
}
|
||||
properties[key.toString()] = value
|
||||
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
||||
val properties = HashMap<Any?, Any?>()
|
||||
for (key in artemisMessage.propertyNames) {
|
||||
var value = artemisMessage.getObjectProperty(key)
|
||||
if (value is SimpleString) {
|
||||
value = value.toString()
|
||||
}
|
||||
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||
val sendableMessage = amqpClient.createMessage(data, peerInbox,
|
||||
legalNames.first().toString(),
|
||||
properties)
|
||||
sendableMessage.onComplete.then {
|
||||
log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" }
|
||||
lock.withLock {
|
||||
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
|
||||
artemisMessage.acknowledge()
|
||||
} else {
|
||||
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||
// We need to commit any acknowledged messages before rolling back the failed
|
||||
// (unacknowledged) message.
|
||||
session?.commit()
|
||||
session?.rollback(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
amqpClient.write(sendableMessage)
|
||||
properties[key.toString()] = value
|
||||
}
|
||||
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||
val sendableMessage = amqpClient.createMessage(data, peerInbox,
|
||||
legalNames.first().toString(),
|
||||
properties)
|
||||
sendableMessage.onComplete.then {
|
||||
log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" }
|
||||
lock.withLock {
|
||||
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
|
||||
artemisMessage.acknowledge()
|
||||
} else {
|
||||
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||
// We need to commit any acknowledged messages before rolling back the failed
|
||||
// (unacknowledged) message.
|
||||
session?.commit()
|
||||
session?.rollback(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
amqpClient.write(sendableMessage)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -53,6 +53,7 @@ class NetworkBootstrapper {
|
||||
|
||||
private const val LOGS_DIR_NAME = "logs"
|
||||
private const val WHITELIST_FILE_NAME = "whitelist.txt"
|
||||
private const val EXCLUDE_WHITELIST_FILE_NAME = "exclude_whitelist.txt"
|
||||
|
||||
@JvmStatic
|
||||
fun main(args: Array<String>) {
|
||||
@ -79,7 +80,7 @@ class NetworkBootstrapper {
|
||||
println("Gathering notary identities")
|
||||
val notaryInfos = gatherNotaryInfos(nodeInfoFiles)
|
||||
println("Notary identities to be used in network parameters: ${notaryInfos.joinToString("; ") { it.prettyPrint() }}")
|
||||
val mergedWhiteList = generateWhitelist(directory / WHITELIST_FILE_NAME, cordapps?.distinct())
|
||||
val mergedWhiteList = generateWhitelist(directory / WHITELIST_FILE_NAME, directory / EXCLUDE_WHITELIST_FILE_NAME, cordapps?.distinct())
|
||||
println("Updating whitelist")
|
||||
overwriteWhitelist(directory / WHITELIST_FILE_NAME, mergedWhiteList)
|
||||
installNetworkParameters(notaryInfos, nodeDirs, mergedWhiteList)
|
||||
@ -187,21 +188,24 @@ class NetworkBootstrapper {
|
||||
nodeDirs.forEach { copier.install(it) }
|
||||
}
|
||||
|
||||
private fun generateWhitelist(whitelistFile: Path, cordapps: List<String>?): Map<String, List<AttachmentId>> {
|
||||
private fun generateWhitelist(whitelistFile: Path, excludeWhitelistFile: Path, cordapps: List<String>?): Map<String, List<AttachmentId>> {
|
||||
val existingWhitelist = if (whitelistFile.exists()) readContractWhitelist(whitelistFile) else emptyMap()
|
||||
|
||||
println("Found existing whitelist:")
|
||||
existingWhitelist.forEach { println(it.outputString()) }
|
||||
println(if (existingWhitelist.isEmpty()) "No existing whitelist file found." else "Found existing whitelist: ${whitelistFile}")
|
||||
|
||||
val newWhiteList: Map<ContractClassName, AttachmentId> = cordapps?.flatMap { cordappJarPath ->
|
||||
val excludeContracts = if (excludeWhitelistFile.exists()) readExcludeWhitelist(excludeWhitelistFile) else emptyList()
|
||||
if (excludeContracts.isNotEmpty()) {
|
||||
println("Exclude contracts from whitelist: ${excludeContracts.joinToString()}}")
|
||||
}
|
||||
|
||||
val newWhiteList = cordapps?.flatMap { cordappJarPath ->
|
||||
val jarHash = getJarHash(cordappJarPath)
|
||||
scanJarForContracts(cordappJarPath).map { contract ->
|
||||
contract to jarHash
|
||||
}
|
||||
}?.toMap() ?: emptyMap()
|
||||
}?.filter { (contractClassName, _) -> contractClassName !in excludeContracts }?.toMap() ?: emptyMap()
|
||||
|
||||
println("Calculating whitelist for current CorDapps:")
|
||||
newWhiteList.forEach { (contract, attachment) -> println("$contract:$attachment") }
|
||||
println("Calculating whitelist for current installed CorDapps..")
|
||||
|
||||
val merged = (newWhiteList.keys + existingWhitelist.keys).map { contractClassName ->
|
||||
val existing = existingWhitelist[contractClassName] ?: emptyList()
|
||||
@ -209,15 +213,15 @@ class NetworkBootstrapper {
|
||||
contractClassName to (if (newHash == null || newHash in existing) existing else existing + newHash)
|
||||
}.toMap()
|
||||
|
||||
println("Final whitelist:")
|
||||
merged.forEach { println(it.outputString()) }
|
||||
|
||||
println("CorDapp whitelist " + (if (existingWhitelist.isEmpty()) "generated" else "updated") + " in ${whitelistFile}")
|
||||
return merged
|
||||
}
|
||||
|
||||
private fun overwriteWhitelist(whitelistFile: Path, mergedWhiteList: Map<String, List<AttachmentId>>) {
|
||||
PrintStream(whitelistFile.toFile().outputStream()).use { out ->
|
||||
mergedWhiteList.forEach { out.println(it.outputString()) }
|
||||
mergedWhiteList.forEach { (contract, attachments) ->
|
||||
out.println("${contract}:${attachments.joinToString(",")}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -227,12 +231,14 @@ class NetworkBootstrapper {
|
||||
SecureHash.SHA256(hs.hash().asBytes())
|
||||
}
|
||||
|
||||
private fun readContractWhitelist(file: Path): Map<String, List<AttachmentId>> = file.toFile().readLines()
|
||||
private fun readContractWhitelist(file: Path): Map<String, List<AttachmentId>> = file.readAllLines()
|
||||
.map { line -> line.split(":") }
|
||||
.map { (contract, attachmentIds) ->
|
||||
contract to (attachmentIds.split(",").map(::parse))
|
||||
}.toMap()
|
||||
|
||||
private fun readExcludeWhitelist(file: Path): List<String> = file.readAllLines().map(String::trim)
|
||||
|
||||
private fun NotaryInfo.prettyPrint(): String = "${identity.name} (${if (validating) "" else "non-"}validating)"
|
||||
|
||||
private fun NodeInfo.notaryIdentity(): Party {
|
||||
@ -246,8 +252,6 @@ class NetworkBootstrapper {
|
||||
}
|
||||
}
|
||||
|
||||
private fun Map.Entry<ContractClassName, List<AttachmentId>>.outputString() = "$key:${value.joinToString(",")}"
|
||||
|
||||
// We need to to set serialization env, because generation of parameters is run from Cordform.
|
||||
// KryoServerSerializationScheme is not accessible from nodeapi.
|
||||
private fun initialiseSerialization() {
|
||||
|
@ -29,14 +29,7 @@ val KRYO_RPC_SERVER_CONTEXT = SerializationContextImpl(kryoMagic,
|
||||
true,
|
||||
SerializationContext.UseCase.RPCServer,
|
||||
null)
|
||||
val KRYO_STORAGE_CONTEXT = SerializationContextImpl(kryoMagic,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
AllButBlacklisted,
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.Storage,
|
||||
null,
|
||||
AlwaysAcceptEncodingWhitelist)
|
||||
|
||||
val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
AllButBlacklisted,
|
||||
@ -45,6 +38,7 @@ val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic,
|
||||
SerializationContext.UseCase.Storage,
|
||||
null,
|
||||
AlwaysAcceptEncodingWhitelist)
|
||||
|
||||
val AMQP_RPC_SERVER_CONTEXT = SerializationContextImpl(amqpMagic,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
|
@ -17,14 +17,6 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
||||
* CANNOT always be instantiated outside of the server and so
|
||||
* MUST be kept separate from these ones!
|
||||
*/
|
||||
|
||||
val KRYO_P2P_CONTEXT = SerializationContextImpl(kryoMagic,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.P2P,
|
||||
null)
|
||||
val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(kryoMagic,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
QuasarWhitelist,
|
||||
@ -33,6 +25,7 @@ val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(kryoMagic,
|
||||
SerializationContext.UseCase.Checkpoint,
|
||||
SNAPPY,
|
||||
AlwaysAcceptEncodingWhitelist)
|
||||
|
||||
val AMQP_P2P_CONTEXT = SerializationContextImpl(amqpMagic,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
|
@ -2,13 +2,16 @@
|
||||
|
||||
package net.corda.nodeapi.internal.serialization.amqp
|
||||
|
||||
import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.internal.objectOrNewInstance
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.nodeapi.internal.serialization.DefaultWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.MutableClassWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.SerializationScheme
|
||||
import java.lang.reflect.Modifier
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
@ -28,10 +31,20 @@ fun SerializerFactory.addToWhitelist(vararg types: Class<*>) {
|
||||
|
||||
abstract class AbstractAMQPSerializationScheme(val cordappLoader: List<Cordapp>) : SerializationScheme {
|
||||
|
||||
// TODO: This method of initialisation for the Whitelist and plugin serializers will have to change
|
||||
// when we have per-cordapp contexts and dynamic app reloading but for now it's the easiest way
|
||||
companion object {
|
||||
private val serializationWhitelists: List<SerializationWhitelist> by lazy {
|
||||
ServiceLoader.load(SerializationWhitelist::class.java, this::class.java.classLoader).toList() + DefaultWhitelist
|
||||
}
|
||||
|
||||
private val customSerializers: List<SerializationCustomSerializer<*, *>> by lazy {
|
||||
FastClasspathScanner().addClassLoader(this::class.java.classLoader).scan()
|
||||
.getNamesOfClassesImplementing(SerializationCustomSerializer::class.java)
|
||||
.mapNotNull { this::class.java.classLoader.loadClass(it).asSubclass(SerializationCustomSerializer::class.java) }
|
||||
.filterNot { Modifier.isAbstract(it.modifiers) }
|
||||
.map { it.kotlin.objectOrNewInstance() }
|
||||
}
|
||||
}
|
||||
|
||||
private fun registerCustomSerializers(factory: SerializerFactory) {
|
||||
@ -69,11 +82,20 @@ abstract class AbstractAMQPSerializationScheme(val cordappLoader: List<Cordapp>)
|
||||
factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray())
|
||||
}
|
||||
|
||||
for (loader in cordappLoader) {
|
||||
for (schema in loader.serializationCustomSerializers) {
|
||||
factory.registerExternal(CorDappCustomSerializer(schema, factory))
|
||||
// If we're passed in an external list we trust that, otherwise revert to looking at the scan of the
|
||||
// classpath to find custom serializers.
|
||||
if (cordappLoader.isEmpty()) {
|
||||
for (customSerializer in customSerializers) {
|
||||
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
|
||||
}
|
||||
} else {
|
||||
cordappLoader.forEach { loader ->
|
||||
for (customSerializer in loader.serializationCustomSerializers) {
|
||||
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private val serializerFactoriesForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>()
|
||||
|
@ -105,6 +105,7 @@ dependencies {
|
||||
|
||||
// Jackson support: serialisation to/from JSON, YAML, etc
|
||||
compile project(':client:jackson')
|
||||
compile group: 'org.json', name: 'json', version: json_version
|
||||
|
||||
// Coda Hale's Metrics: for monitoring of key statistics
|
||||
compile "io.dropwizard.metrics:metrics-core:3.1.2"
|
||||
|
@ -6,6 +6,10 @@ import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.config.CertChainPolicyConfig
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
@ -27,13 +31,14 @@ import java.util.*
|
||||
import kotlin.system.measureNanoTime
|
||||
import kotlin.system.measureTimeMillis
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNotEquals
|
||||
|
||||
class AMQPBridgeTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val temporaryFolder = TemporaryFolder()
|
||||
|
||||
private val log = loggerFor<AMQPBridgeTest>()
|
||||
|
||||
private val ALICE = TestIdentity(ALICE_NAME)
|
||||
private val BOB = TestIdentity(BOB_NAME)
|
||||
|
||||
@ -68,14 +73,16 @@ class AMQPBridgeTest {
|
||||
|
||||
//Create target server
|
||||
val amqpServer = createAMQPServer()
|
||||
val dedupeSet = mutableSetOf<String>()
|
||||
|
||||
val receive = amqpServer.onReceive.toBlocking().iterator
|
||||
amqpServer.start()
|
||||
|
||||
val receivedSequence = mutableListOf<Int>()
|
||||
val atNodeSequence = mutableListOf<Int>()
|
||||
|
||||
fun formatMessage(expected: String, actual: Int, received: List<Int>): String {
|
||||
return "Expected message with id $expected, got $actual, previous message receive sequence: "
|
||||
return "Expected message with id $expected, got $actual, previous message receive sequence: " +
|
||||
"${received.joinToString(", ", "[", "]")}."
|
||||
}
|
||||
|
||||
@ -83,66 +90,84 @@ class AMQPBridgeTest {
|
||||
val messageID1 = received1.applicationProperties["CountProp"] as Int
|
||||
assertArrayEquals("Test$messageID1".toByteArray(), received1.payload)
|
||||
assertEquals(0, messageID1)
|
||||
dedupeSet += received1.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
||||
received1.complete(true) // Accept first message
|
||||
receivedSequence.add(messageID1)
|
||||
receivedSequence += messageID1
|
||||
atNodeSequence += messageID1
|
||||
|
||||
val received2 = receive.next()
|
||||
val messageID2 = received2.applicationProperties["CountProp"] as Int
|
||||
assertArrayEquals("Test$messageID2".toByteArray(), received2.payload)
|
||||
assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence))
|
||||
received2.complete(false) // Reject message
|
||||
receivedSequence.add(messageID2)
|
||||
received2.complete(false) // Reject message and don't add to dedupe
|
||||
receivedSequence += messageID2 // reflects actual sequence
|
||||
|
||||
// drop things until we get back to the replay
|
||||
while (true) {
|
||||
val received3 = receive.next()
|
||||
val messageID3 = received3.applicationProperties["CountProp"] as Int
|
||||
assertArrayEquals("Test$messageID3".toByteArray(), received3.payload)
|
||||
assertNotEquals(0, messageID3, formatMessage("< 1", messageID3, receivedSequence))
|
||||
receivedSequence.add(messageID3)
|
||||
receivedSequence += messageID3
|
||||
if (messageID3 != 1) { // keep rejecting any batched items following rejection
|
||||
received3.complete(false)
|
||||
} else { // beginnings of replay so accept again
|
||||
received3.complete(true)
|
||||
val messageId = received3.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
||||
if (messageId !in dedupeSet) {
|
||||
dedupeSet += messageId
|
||||
atNodeSequence += messageID3
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// start receiving again, but discarding duplicates
|
||||
while (true) {
|
||||
val received4 = receive.next()
|
||||
val messageID4 = received4.applicationProperties["CountProp"] as Int
|
||||
assertArrayEquals("Test$messageID4".toByteArray(), received4.payload)
|
||||
receivedSequence.add(messageID4)
|
||||
if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip
|
||||
assertEquals(2, messageID4) // next message should be in order though
|
||||
break
|
||||
receivedSequence += messageID4
|
||||
val messageId = received4.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
||||
if (messageId !in dedupeSet) {
|
||||
dedupeSet += messageId
|
||||
atNodeSequence += messageID4
|
||||
}
|
||||
received4.complete(true)
|
||||
if (messageID4 == 2) { // started to replay messages after rejection point
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Send a fresh item and check receive
|
||||
val artemisMessage = artemis.session.createMessage(true).apply {
|
||||
putIntProperty("CountProp", -1)
|
||||
writeBodyBufferBytes("Test_end".toByteArray())
|
||||
putIntProperty("CountProp", 3)
|
||||
writeBodyBufferBytes("Test3".toByteArray())
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
artemis.producer.send(sourceQueueName, artemisMessage)
|
||||
|
||||
|
||||
// start receiving again, discarding duplicates
|
||||
while (true) {
|
||||
val received5 = receive.next()
|
||||
val messageID5 = received5.applicationProperties["CountProp"] as Int
|
||||
if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip
|
||||
assertEquals(-1, messageID5, formatMessage("-1", messageID5, receivedSequence)) // next message should be in order though
|
||||
assertArrayEquals("Test_end".toByteArray(), received5.payload)
|
||||
receivedSequence.add(messageID5)
|
||||
assertArrayEquals("Test$messageID5".toByteArray(), received5.payload)
|
||||
receivedSequence += messageID5
|
||||
val messageId = received5.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
||||
if (messageId !in dedupeSet) {
|
||||
dedupeSet += messageId
|
||||
atNodeSequence += messageID5
|
||||
}
|
||||
received5.complete(true)
|
||||
if (messageID5 == 3) { // reached our fresh message
|
||||
break
|
||||
}
|
||||
receivedSequence.add(messageID5)
|
||||
received5.complete(true)
|
||||
}
|
||||
|
||||
println("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}")
|
||||
log.info("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}")
|
||||
log.info("Deduped sequence: ${atNodeSequence.joinToString(", ", "[", "]")}")
|
||||
assertEquals(listOf(0, 1, 2, 3), atNodeSequence)
|
||||
bridgeManager.stop()
|
||||
amqpServer.stop()
|
||||
artemisClient.stop()
|
||||
|
@ -1,6 +1,5 @@
|
||||
package net.corda.node.shell
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory
|
||||
import com.fasterxml.jackson.core.JsonGenerator
|
||||
import com.fasterxml.jackson.core.JsonParser
|
||||
import com.fasterxml.jackson.databind.*
|
||||
@ -15,6 +14,7 @@ import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.UniqueIdentifier
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
@ -22,7 +22,9 @@ import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.FlowProgressHandle
|
||||
import net.corda.core.messaging.StateMachineUpdate
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.IdentityService
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.internal.security.AdminSubject
|
||||
@ -52,6 +54,7 @@ import org.crsh.util.Utils
|
||||
import org.crsh.vfs.FS
|
||||
import org.crsh.vfs.spi.file.FileMountFactory
|
||||
import org.crsh.vfs.spi.url.ClassPathMountFactory
|
||||
import org.json.JSONObject
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
@ -196,13 +199,35 @@ object InteractiveShell {
|
||||
}
|
||||
}
|
||||
|
||||
private fun createOutputMapper(factory: JsonFactory): ObjectMapper {
|
||||
return JacksonSupport.createNonRpcMapper(factory).apply {
|
||||
private object NodeInfoSerializer : JsonSerializer<NodeInfo>() {
|
||||
|
||||
override fun serialize(nodeInfo: NodeInfo, gen: JsonGenerator, serializers: SerializerProvider) {
|
||||
|
||||
val json = JSONObject()
|
||||
json["addresses"] = nodeInfo.addresses.map { address -> address.serialise() }
|
||||
json["legalIdentities"] = nodeInfo.legalIdentities.map { address -> address.serialise() }
|
||||
json["platformVersion"] = nodeInfo.platformVersion
|
||||
json["serial"] = nodeInfo.serial
|
||||
gen.writeRaw(json.toString())
|
||||
}
|
||||
|
||||
private fun NetworkHostAndPort.serialise() = this.toString()
|
||||
private fun Party.serialise() = JSONObject().put("name", this.name)
|
||||
|
||||
private operator fun JSONObject.set(key: String, value: Any?): JSONObject {
|
||||
return put(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
private fun createOutputMapper(): ObjectMapper {
|
||||
|
||||
return JacksonSupport.createNonRpcMapper().apply {
|
||||
// Register serializers for stateful objects from libraries that are special to the RPC system and don't
|
||||
// make sense to print out to the screen. For classes we own, annotations can be used instead.
|
||||
val rpcModule = SimpleModule()
|
||||
rpcModule.addSerializer(Observable::class.java, ObservableSerializer)
|
||||
rpcModule.addSerializer(InputStream::class.java, InputStreamSerializer)
|
||||
rpcModule.addSerializer(NodeInfo::class.java, NodeInfoSerializer)
|
||||
registerModule(rpcModule)
|
||||
|
||||
disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
|
||||
@ -211,7 +236,7 @@ object InteractiveShell {
|
||||
}
|
||||
|
||||
// TODO: This should become the default renderer rather than something used specifically by commands.
|
||||
private val yamlMapper by lazy { createOutputMapper(YAMLFactory()) }
|
||||
private val outputMapper by lazy { createOutputMapper() }
|
||||
|
||||
/**
|
||||
* Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out
|
||||
@ -394,11 +419,19 @@ object InteractiveShell {
|
||||
return result
|
||||
}
|
||||
|
||||
private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture<Unit> {
|
||||
val printerFun = yamlMapper::writeValueAsString
|
||||
toStream.println(printerFun(response))
|
||||
toStream.flush()
|
||||
return maybeFollow(response, printerFun, toStream)
|
||||
private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter): CordaFuture<Unit> {
|
||||
|
||||
val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) }
|
||||
val mappingFunction: (Any?) -> String = { value ->
|
||||
if (value is Collection<*>) {
|
||||
value.joinToString(",${System.lineSeparator()} ", "[${System.lineSeparator()} ", "${System.lineSeparator()}]") { element ->
|
||||
mapElement(element)
|
||||
}
|
||||
} else {
|
||||
mapElement(value)
|
||||
}
|
||||
}
|
||||
return maybeFollow(response, mappingFunction, out)
|
||||
}
|
||||
|
||||
private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() {
|
||||
@ -421,6 +454,7 @@ object InteractiveShell {
|
||||
override fun onNext(t: Any?) {
|
||||
count++
|
||||
toStream.println("Observation $count: " + printerFun(t))
|
||||
toStream.flush()
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
@ -431,25 +465,32 @@ object InteractiveShell {
|
||||
}
|
||||
}
|
||||
|
||||
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): CordaFuture<Unit> {
|
||||
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> {
|
||||
// Match on a couple of common patterns for "important" observables. It's tough to do this in a generic
|
||||
// way because observables can be embedded anywhere in the object graph, and can emit other arbitrary
|
||||
// object graphs that contain yet more observables. So we just look for top level responses that follow
|
||||
// the standard "track" pattern, and print them until the user presses Ctrl-C
|
||||
if (response == null) return doneFuture(Unit)
|
||||
|
||||
val observable: Observable<*> = when (response) {
|
||||
is Observable<*> -> response
|
||||
is DataFeed<*, *> -> {
|
||||
toStream.println("Snapshot")
|
||||
toStream.println(response.snapshot)
|
||||
response.updates
|
||||
}
|
||||
else -> return doneFuture(Unit)
|
||||
if (response is DataFeed<*, *>) {
|
||||
out.println("Snapshot:")
|
||||
out.println(printerFun(response.snapshot))
|
||||
out.flush()
|
||||
out.println("Updates:")
|
||||
return printNextElements(response.updates, printerFun, out)
|
||||
}
|
||||
if (response is Observable<*>) {
|
||||
return printNextElements(response, printerFun, out)
|
||||
}
|
||||
|
||||
val subscriber = PrintingSubscriber(printerFun, toStream)
|
||||
uncheckedCast(observable).subscribe(subscriber)
|
||||
out.println(printerFun(response))
|
||||
return doneFuture(Unit)
|
||||
}
|
||||
|
||||
private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> {
|
||||
|
||||
val subscriber = PrintingSubscriber(printerFun, out)
|
||||
uncheckedCast(elements).subscribe(subscriber)
|
||||
return subscriber.future
|
||||
}
|
||||
|
||||
|
@ -6,7 +6,7 @@ import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.utilities.toHexString
|
||||
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
||||
import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT
|
||||
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||
@ -27,10 +27,10 @@ private class EnclaveletSerializationScheme {
|
||||
registerScheme(AMQPVerifierSerializationScheme)
|
||||
},
|
||||
/**
|
||||
* Even though default context is set to Kryo P2P, the encoding will be adjusted depending on the
|
||||
* Even though default context is set to Amqp P2P, the encoding will be adjusted depending on the
|
||||
* incoming request received.
|
||||
*/
|
||||
KRYO_P2P_CONTEXT)
|
||||
AMQP_P2P_CONTEXT)
|
||||
|
||||
/*
|
||||
* Ensure that we initialise JAXP before blacklisting is enabled.
|
||||
|
Loading…
Reference in New Issue
Block a user