mirror of
https://github.com/corda/corda.git
synced 2025-01-16 01:40:17 +00:00
Merge remote-tracking branch 'open/master' into aslemmer-enterprise-merge-september-8
This commit is contained in:
commit
39b9df8073
@ -1,3 +1,4 @@
|
|||||||
|
@file:JvmName("ObservableFold")
|
||||||
package net.corda.client.jfx.utils
|
package net.corda.client.jfx.utils
|
||||||
|
|
||||||
import javafx.application.Platform
|
import javafx.application.Platform
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
@file:JvmName("ObservableUtilities")
|
||||||
package net.corda.client.jfx.utils
|
package net.corda.client.jfx.utils
|
||||||
|
|
||||||
import javafx.application.Platform
|
import javafx.application.Platform
|
||||||
|
@ -81,15 +81,16 @@ open class ReadOnlyBackedObservableMapBase<K, A, B> : ObservableMap<K, A> {
|
|||||||
throw UnsupportedOperationException("remove() can't be called on ReadOnlyObservableMapBase")
|
throw UnsupportedOperationException("remove() can't be called on ReadOnlyObservableMapBase")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
/**
|
||||||
|
* Construct an object modelling the given change to an observed map.
|
||||||
fun <A, K> ObservableMap<K, A>.createMapChange(key: K, removedValue: A?, addedValue: A?): MapChangeListener.Change<K, A> {
|
*/
|
||||||
return object : MapChangeListener.Change<K, A>(this) {
|
fun createMapChange(key: K, removedValue: A?, addedValue: A?): MapChangeListener.Change<K, A> {
|
||||||
override fun getKey() = key
|
return object : MapChangeListener.Change<K, A>(this) {
|
||||||
override fun wasRemoved() = removedValue != null
|
override fun getKey() = key
|
||||||
override fun wasAdded() = addedValue != null
|
override fun wasRemoved() = removedValue != null
|
||||||
override fun getValueRemoved() = removedValue
|
override fun wasAdded() = addedValue != null
|
||||||
override fun getValueAdded() = addedValue
|
override fun getValueRemoved() = removedValue
|
||||||
|
override fun getValueAdded() = addedValue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ import java.util.*
|
|||||||
* [Generator.choice] picks a generator from the specified list and runs that.
|
* [Generator.choice] picks a generator from the specified list and runs that.
|
||||||
* [Generator.frequency] is similar to [choice] but the probability may be specified for each generator (it is normalised before picking).
|
* [Generator.frequency] is similar to [choice] but the probability may be specified for each generator (it is normalised before picking).
|
||||||
* [Generator.combine] combines two generators of A and B with a function (A, B) -> C. Variants exist for other arities.
|
* [Generator.combine] combines two generators of A and B with a function (A, B) -> C. Variants exist for other arities.
|
||||||
* [Generator.flatMap] sequences two generators using an arbitrary A->Generator<B> function. Keep the usage of this
|
* [Generator.flatMap] sequences two generators using an arbitrary A->Generator<B> function. Keep the usage of this
|
||||||
* function minimal as it may explode the stack, especially when using recursion.
|
* function minimal as it may explode the stack, especially when using recursion.
|
||||||
*
|
*
|
||||||
* There are other utilities as well, the type of which are usually descriptive.
|
* There are other utilities as well, the type of which are usually descriptive.
|
||||||
@ -32,7 +32,6 @@ import java.util.*
|
|||||||
* The above will generate a random list of animals.
|
* The above will generate a random list of animals.
|
||||||
*/
|
*/
|
||||||
class Generator<out A>(val generate: (SplittableRandom) -> Try<A>) {
|
class Generator<out A>(val generate: (SplittableRandom) -> Try<A>) {
|
||||||
|
|
||||||
// Functor
|
// Functor
|
||||||
fun <B> map(function: (A) -> B): Generator<B> =
|
fun <B> map(function: (A) -> B): Generator<B> =
|
||||||
Generator { generate(it).map(function) }
|
Generator { generate(it).map(function) }
|
||||||
@ -58,16 +57,42 @@ class Generator<out A>(val generate: (SplittableRandom) -> Try<A>) {
|
|||||||
return Generator { random -> generate(random).flatMap { function(it).generate(random) } }
|
return Generator { random -> generate(random).flatMap { function(it).generate(random) } }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun generateOrFail(random: SplittableRandom, numberOfTries: Int = 1): A {
|
||||||
|
var error: Throwable? = null
|
||||||
|
for (i in 0..numberOfTries - 1) {
|
||||||
|
val result = generate(random)
|
||||||
|
error = when (result) {
|
||||||
|
is Try.Success -> return result.value
|
||||||
|
is Try.Failure -> result.exception
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (error == null) {
|
||||||
|
throw IllegalArgumentException("numberOfTries cannot be <= 0")
|
||||||
|
} else {
|
||||||
|
throw Exception("Failed to generate", error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
fun <A> pure(value: A) = Generator { Try.Success(value) }
|
fun <A> pure(value: A) = Generator { Try.Success(value) }
|
||||||
fun <A> impure(valueClosure: () -> A) = Generator { Try.Success(valueClosure()) }
|
fun <A> impure(valueClosure: () -> A) = Generator { Try.Success(valueClosure()) }
|
||||||
fun <A> fail(error: Exception) = Generator<A> { Try.Failure(error) }
|
fun <A> fail(error: Exception) = Generator<A> { Try.Failure(error) }
|
||||||
|
|
||||||
// Alternative
|
/**
|
||||||
|
* Pick a generator from the specified list and run it.
|
||||||
|
*/
|
||||||
fun <A> choice(generators: List<Generator<A>>) = intRange(0, generators.size - 1).flatMap { generators[it] }
|
fun <A> choice(generators: List<Generator<A>>) = intRange(0, generators.size - 1).flatMap { generators[it] }
|
||||||
|
|
||||||
fun <A> success(generate: (SplittableRandom) -> A) = Generator { Try.Success(generate(it)) }
|
fun <A> success(generate: (SplittableRandom) -> A) = Generator { Try.Success(generate(it)) }
|
||||||
|
/**
|
||||||
|
* Pick a generator from the specified list, with a probability assigned to each generator, then run the
|
||||||
|
* chosen generator.
|
||||||
|
*
|
||||||
|
* @param generators a list of probabilities of a generator being chosen, and generators. Probabilities must be
|
||||||
|
* non-negative.
|
||||||
|
*/
|
||||||
fun <A> frequency(generators: List<Pair<Double, Generator<A>>>): Generator<A> {
|
fun <A> frequency(generators: List<Pair<Double, Generator<A>>>): Generator<A> {
|
||||||
|
require(generators.all { it.first >= 0.0 }) { "Probabilities must not be negative" }
|
||||||
val ranges = mutableListOf<Pair<Double, Double>>()
|
val ranges = mutableListOf<Pair<Double, Double>>()
|
||||||
var current = 0.0
|
var current = 0.0
|
||||||
generators.forEach {
|
generators.forEach {
|
||||||
@ -88,6 +113,8 @@ class Generator<out A>(val generate: (SplittableRandom) -> Try<A>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun <A> frequency(vararg generators: Pair<Double, Generator<A>>) = frequency(generators.toList())
|
||||||
|
|
||||||
fun <A> sequence(generators: List<Generator<A>>) = Generator<List<A>> {
|
fun <A> sequence(generators: List<Generator<A>>) = Generator<List<A>> {
|
||||||
val result = mutableListOf<A>()
|
val result = mutableListOf<A>()
|
||||||
for (generator in generators) {
|
for (generator in generators) {
|
||||||
@ -99,129 +126,113 @@ class Generator<out A>(val generate: (SplittableRandom) -> Try<A>) {
|
|||||||
}
|
}
|
||||||
Try.Success(result)
|
Try.Success(result)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fun <A> Generator.Companion.frequency(vararg generators: Pair<Double, Generator<A>>) = frequency(generators.toList())
|
fun int() = Generator.success(SplittableRandom::nextInt)
|
||||||
|
fun long() = Generator.success(SplittableRandom::nextLong)
|
||||||
fun <A> Generator<A>.generateOrFail(random: SplittableRandom, numberOfTries: Int = 1): A {
|
fun bytes(size: Int): Generator<ByteArray> = Generator.success { random ->
|
||||||
var error: Throwable? = null
|
ByteArray(size) { random.nextInt().toByte() }
|
||||||
for (i in 0..numberOfTries - 1) {
|
|
||||||
val result = generate(random)
|
|
||||||
error = when (result) {
|
|
||||||
is Try.Success -> return result.value
|
|
||||||
is Try.Failure -> result.exception
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (error == null) {
|
|
||||||
throw IllegalArgumentException("numberOfTries cannot be <= 0")
|
|
||||||
} else {
|
|
||||||
throw Exception("Failed to generate", error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fun Generator.Companion.int() = Generator.success(SplittableRandom::nextInt)
|
fun intRange(range: IntRange) = intRange(range.first, range.last)
|
||||||
fun Generator.Companion.long() = Generator.success(SplittableRandom::nextLong)
|
fun intRange(from: Int, to: Int): Generator<Int> = Generator.success {
|
||||||
fun Generator.Companion.bytes(size: Int): Generator<ByteArray> = Generator.success { random ->
|
(from + Math.abs(it.nextInt()) % (to - from + 1)).toInt()
|
||||||
ByteArray(size) { random.nextInt().toByte() }
|
}
|
||||||
}
|
|
||||||
|
|
||||||
fun Generator.Companion.intRange(range: IntRange) = intRange(range.first, range.last)
|
fun longRange(range: LongRange) = longRange(range.first, range.last)
|
||||||
fun Generator.Companion.intRange(from: Int, to: Int): Generator<Int> = Generator.success {
|
fun longRange(from: Long, to: Long): Generator<Long> = Generator.success {
|
||||||
(from + Math.abs(it.nextInt()) % (to - from + 1)).toInt()
|
(from + Math.abs(it.nextLong()) % (to - from + 1)).toLong()
|
||||||
}
|
}
|
||||||
|
|
||||||
fun Generator.Companion.longRange(range: LongRange) = longRange(range.first, range.last)
|
fun double() = Generator.success { it.nextDouble() }
|
||||||
fun Generator.Companion.longRange(from: Long, to: Long): Generator<Long> = Generator.success {
|
fun doubleRange(from: Double, to: Double): Generator<Double> = Generator.success {
|
||||||
(from + Math.abs(it.nextLong()) % (to - from + 1)).toLong()
|
from + it.nextDouble() * (to - from)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun Generator.Companion.double() = Generator.success { it.nextDouble() }
|
fun char() = Generator {
|
||||||
fun Generator.Companion.doubleRange(from: Double, to: Double): Generator<Double> = Generator.success {
|
val codePoint = Math.abs(it.nextInt()) % (17 * (1 shl 16))
|
||||||
from + it.nextDouble() * (to - from)
|
if (Character.isValidCodePoint(codePoint)) {
|
||||||
}
|
return@Generator Try.Success(codePoint.toChar())
|
||||||
|
|
||||||
fun Generator.Companion.char() = Generator {
|
|
||||||
val codePoint = Math.abs(it.nextInt()) % (17 * (1 shl 16))
|
|
||||||
if (Character.isValidCodePoint(codePoint)) {
|
|
||||||
return@Generator Try.Success(codePoint.toChar())
|
|
||||||
} else {
|
|
||||||
Try.Failure(IllegalStateException("Could not generate valid codepoint"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fun Generator.Companion.string(meanSize: Double = 16.0) = replicatePoisson(meanSize, char()).map {
|
|
||||||
val builder = StringBuilder()
|
|
||||||
it.forEach {
|
|
||||||
builder.append(it)
|
|
||||||
}
|
|
||||||
builder.toString()
|
|
||||||
}
|
|
||||||
|
|
||||||
fun <A> Generator.Companion.replicate(number: Int, generator: Generator<A>): Generator<List<A>> {
|
|
||||||
val generators = mutableListOf<Generator<A>>()
|
|
||||||
for (i in 1..number) {
|
|
||||||
generators.add(generator)
|
|
||||||
}
|
|
||||||
return sequence(generators)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
fun <A> Generator.Companion.replicatePoisson(meanSize: Double, generator: Generator<A>, atLeastOne: Boolean = false) = Generator<List<A>> {
|
|
||||||
val chance = (meanSize - 1) / meanSize
|
|
||||||
val result = mutableListOf<A>()
|
|
||||||
var finish = false
|
|
||||||
while (!finish) {
|
|
||||||
val res = Generator.doubleRange(0.0, 1.0).generate(it).flatMap { value ->
|
|
||||||
if (value < chance) {
|
|
||||||
generator.generate(it).map { result.add(it) }
|
|
||||||
} else {
|
} else {
|
||||||
finish = true
|
Try.Failure(IllegalStateException("Could not generate valid codepoint"))
|
||||||
if (result.isEmpty() && atLeastOne) {
|
|
||||||
generator.generate(it).map { result.add(it) }
|
|
||||||
} else Try.Success(Unit)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (res is Try.Failure) {
|
|
||||||
return@Generator res
|
fun string(meanSize: Double = 16.0) = replicatePoisson(meanSize, char()).map {
|
||||||
|
val builder = StringBuilder()
|
||||||
|
it.forEach {
|
||||||
|
builder.append(it)
|
||||||
|
}
|
||||||
|
builder.toString()
|
||||||
}
|
}
|
||||||
}
|
|
||||||
Try.Success(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun <A> Generator.Companion.pickOne(list: List<A>) = Generator.intRange(0, list.size - 1).map { list[it] }
|
fun <A> replicate(number: Int, generator: Generator<A>): Generator<List<A>> {
|
||||||
fun <A> Generator.Companion.pickN(number: Int, list: List<A>) = Generator<List<A>> {
|
val generators = mutableListOf<Generator<A>>()
|
||||||
val mask = BitSet(list.size)
|
for (i in 1..number) {
|
||||||
val size = Math.min(list.size, number)
|
generators.add(generator)
|
||||||
for (i in 0..size - 1) {
|
}
|
||||||
// mask[i] = 1 desugars into mask.set(i, 1), which sets a range instead of a bit
|
return sequence(generators)
|
||||||
mask[i] = true
|
|
||||||
}
|
|
||||||
for (i in 0..list.size - 1) {
|
|
||||||
val bit = mask[i]
|
|
||||||
val swapIndex = i + it.nextInt(size - i)
|
|
||||||
mask[i] = mask[swapIndex]
|
|
||||||
mask[swapIndex] = bit
|
|
||||||
}
|
|
||||||
val resultList = ArrayList<A>()
|
|
||||||
list.forEachIndexed { index, a ->
|
|
||||||
if (mask[index]) {
|
|
||||||
resultList.add(a)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
Try.Success(resultList)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun <A> Generator.Companion.sampleBernoulli(maxRatio: Double = 1.0, vararg collection: A) =
|
|
||||||
sampleBernoulli(listOf(collection), maxRatio)
|
|
||||||
|
|
||||||
fun <A> Generator.Companion.sampleBernoulli(collection: Collection<A>, meanRatio: Double = 1.0): Generator<List<A>> =
|
fun <A> replicatePoisson(meanSize: Double, generator: Generator<A>, atLeastOne: Boolean = false) = Generator<List<A>> {
|
||||||
replicate(collection.size, Generator.doubleRange(0.0, 1.0)).map { chances ->
|
val chance = (meanSize - 1) / meanSize
|
||||||
val result = mutableListOf<A>()
|
val result = mutableListOf<A>()
|
||||||
collection.forEachIndexed { index, element ->
|
var finish = false
|
||||||
if (chances[index] < meanRatio) {
|
while (!finish) {
|
||||||
result.add(element)
|
val res = Generator.doubleRange(0.0, 1.0).generate(it).flatMap { value ->
|
||||||
|
if (value < chance) {
|
||||||
|
generator.generate(it).map { result.add(it) }
|
||||||
|
} else {
|
||||||
|
finish = true
|
||||||
|
if (result.isEmpty() && atLeastOne) {
|
||||||
|
generator.generate(it).map { result.add(it) }
|
||||||
|
} else Try.Success(Unit)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (res is Try.Failure) {
|
||||||
|
return@Generator res
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result
|
Try.Success(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun <A> pickOne(list: List<A>) = Generator.intRange(0, list.size - 1).map { list[it] }
|
||||||
|
fun <A> pickN(number: Int, list: List<A>) = Generator<List<A>> {
|
||||||
|
val mask = BitSet(list.size)
|
||||||
|
val size = Math.min(list.size, number)
|
||||||
|
for (i in 0..size - 1) {
|
||||||
|
// mask[i] = 1 desugars into mask.set(i, 1), which sets a range instead of a bit
|
||||||
|
mask[i] = true
|
||||||
|
}
|
||||||
|
for (i in 0..list.size - 1) {
|
||||||
|
val bit = mask[i]
|
||||||
|
val swapIndex = i + it.nextInt(size - i)
|
||||||
|
mask[i] = mask[swapIndex]
|
||||||
|
mask[swapIndex] = bit
|
||||||
|
}
|
||||||
|
val resultList = ArrayList<A>()
|
||||||
|
list.forEachIndexed { index, a ->
|
||||||
|
if (mask[index]) {
|
||||||
|
resultList.add(a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Try.Success(resultList)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun <A> sampleBernoulli(maxRatio: Double = 1.0, vararg collection: A) =
|
||||||
|
sampleBernoulli(listOf(collection), maxRatio)
|
||||||
|
|
||||||
|
fun <A> sampleBernoulli(collection: Collection<A>, meanRatio: Double = 1.0): Generator<List<A>> {
|
||||||
|
return replicate(collection.size, Generator.doubleRange(0.0, 1.0)).map { chances ->
|
||||||
|
val result = mutableListOf<A>()
|
||||||
|
collection.forEachIndexed { index, element ->
|
||||||
|
if (chances[index] < meanRatio) {
|
||||||
|
result.add(element)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -1,3 +1,4 @@
|
|||||||
|
@file:JvmName("Generators")
|
||||||
package net.corda.client.mock
|
package net.corda.client.mock
|
||||||
|
|
||||||
import net.corda.core.contracts.Amount
|
import net.corda.core.contracts.Amount
|
||||||
|
@ -82,6 +82,19 @@ class RPCClientProxyHandler(
|
|||||||
val log = loggerFor<RPCClientProxyHandler>()
|
val log = loggerFor<RPCClientProxyHandler>()
|
||||||
// To check whether toString() is being invoked
|
// To check whether toString() is being invoked
|
||||||
val toStringMethod: Method = Object::toString.javaMethod!!
|
val toStringMethod: Method = Object::toString.javaMethod!!
|
||||||
|
|
||||||
|
private fun addRpcCallSiteToThrowable(throwable: Throwable, callSite: Throwable) {
|
||||||
|
var currentThrowable = throwable
|
||||||
|
while (true) {
|
||||||
|
val cause = currentThrowable.cause
|
||||||
|
if (cause == null) {
|
||||||
|
currentThrowable.initCause(callSite)
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
currentThrowable = cause
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Used for reaping
|
// Used for reaping
|
||||||
@ -393,6 +406,19 @@ object RpcClientObservableSerializer : Serializer<Observable<*>>() {
|
|||||||
return serializationContext.withProperty(RpcObservableContextKey, observableContext)
|
return serializationContext.withProperty(RpcObservableContextKey, observableContext)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun <T> pinInSubscriptions(observable: Observable<T>, hardReferenceStore: MutableSet<Observable<*>>): Observable<T> {
|
||||||
|
val refCount = AtomicInteger(0)
|
||||||
|
return observable.doOnSubscribe {
|
||||||
|
if (refCount.getAndIncrement() == 0) {
|
||||||
|
require(hardReferenceStore.add(observable)) { "Reference store already contained reference $this on add" }
|
||||||
|
}
|
||||||
|
}.doOnUnsubscribe {
|
||||||
|
if (refCount.decrementAndGet() == 0) {
|
||||||
|
require(hardReferenceStore.remove(observable)) { "Reference store did not contain reference $this on remove" }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override fun read(kryo: Kryo, input: Input, type: Class<Observable<*>>): Observable<Any> {
|
override fun read(kryo: Kryo, input: Input, type: Class<Observable<*>>): Observable<Any> {
|
||||||
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
|
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
|
||||||
val observableId = RPCApi.ObservableId(input.readLong(true))
|
val observableId = RPCApi.ObservableId(input.readLong(true))
|
||||||
@ -405,7 +431,7 @@ object RpcClientObservableSerializer : Serializer<Observable<*>>() {
|
|||||||
observableContext.callSiteMap?.put(observableId.toLong, rpcCallSite)
|
observableContext.callSiteMap?.put(observableId.toLong, rpcCallSite)
|
||||||
// We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users
|
// We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users
|
||||||
// don't need to store a reference to the Observables themselves.
|
// don't need to store a reference to the Observables themselves.
|
||||||
return observable.pinInSubscriptions(observableContext.hardReferenceStore).doOnUnsubscribe {
|
return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe {
|
||||||
// This causes Future completions to give warnings because the corresponding OnComplete sent from the server
|
// This causes Future completions to give warnings because the corresponding OnComplete sent from the server
|
||||||
// will arrive after the client unsubscribes from the observable and consequently invalidates the mapping.
|
// will arrive after the client unsubscribes from the observable and consequently invalidates the mapping.
|
||||||
// The unsubscribe is due to [ObservableToFuture]'s use of first().
|
// The unsubscribe is due to [ObservableToFuture]'s use of first().
|
||||||
@ -422,29 +448,3 @@ object RpcClientObservableSerializer : Serializer<Observable<*>>() {
|
|||||||
return observableContext.callSiteMap?.get(rpcRequestOrObservableId)
|
return observableContext.callSiteMap?.get(rpcRequestOrObservableId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun addRpcCallSiteToThrowable(throwable: Throwable, callSite: Throwable) {
|
|
||||||
var currentThrowable = throwable
|
|
||||||
while (true) {
|
|
||||||
val cause = currentThrowable.cause
|
|
||||||
if (cause == null) {
|
|
||||||
currentThrowable.initCause(callSite)
|
|
||||||
break
|
|
||||||
} else {
|
|
||||||
currentThrowable = cause
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun <T> Observable<T>.pinInSubscriptions(hardReferenceStore: MutableSet<Observable<*>>): Observable<T> {
|
|
||||||
val refCount = AtomicInteger(0)
|
|
||||||
return this.doOnSubscribe {
|
|
||||||
if (refCount.getAndIncrement() == 0) {
|
|
||||||
require(hardReferenceStore.add(this)) { "Reference store already contained reference $this on add" }
|
|
||||||
}
|
|
||||||
}.doOnUnsubscribe {
|
|
||||||
if (refCount.decrementAndGet() == 0) {
|
|
||||||
require(hardReferenceStore.remove(this)) { "Reference store did not contain reference $this on remove" }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
gradlePluginsVersion=0.16.1
|
gradlePluginsVersion=0.16.2
|
||||||
kotlinVersion=1.1.4
|
kotlinVersion=1.1.4
|
||||||
guavaVersion=21.0
|
guavaVersion=21.0
|
||||||
bouncycastleVersion=1.57
|
bouncycastleVersion=1.57
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
@file:JvmName("ConcurrencyUtils")
|
||||||
package net.corda.core.concurrent
|
package net.corda.core.concurrent
|
||||||
|
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
|
@ -262,7 +262,9 @@ abstract class SignTransactionFlow(val otherParty: Party,
|
|||||||
* @param stx a partially signed transaction received from your counter-party.
|
* @param stx a partially signed transaction received from your counter-party.
|
||||||
* @throws FlowException if the proposed transaction fails the checks.
|
* @throws FlowException if the proposed transaction fails the checks.
|
||||||
*/
|
*/
|
||||||
@Suspendable abstract protected fun checkTransaction(stx: SignedTransaction)
|
@Suspendable
|
||||||
|
@Throws(FlowException::class)
|
||||||
|
abstract protected fun checkTransaction(stx: SignedTransaction)
|
||||||
|
|
||||||
@Suspendable private fun checkMySignatureRequired(stx: SignedTransaction, signingKey: PublicKey) {
|
@Suspendable private fun checkMySignatureRequired(stx: SignedTransaction, signingKey: PublicKey) {
|
||||||
require(signingKey in stx.tx.requiredSigningKeys) {
|
require(signingKey in stx.tx.requiredSigningKeys) {
|
||||||
|
@ -57,16 +57,11 @@ data class FlowProgressHandleImpl<A>(
|
|||||||
|
|
||||||
// Remember to add @Throws to FlowProgressHandle.close() if this throws an exception.
|
// Remember to add @Throws to FlowProgressHandle.close() if this throws an exception.
|
||||||
override fun close() {
|
override fun close() {
|
||||||
progress.notUsed()
|
try {
|
||||||
|
progress.subscribe({}, {}).unsubscribe()
|
||||||
|
} catch (e: Exception) {
|
||||||
|
// Swallow any other exceptions as well.
|
||||||
|
}
|
||||||
returnValue.cancel(false)
|
returnValue.cancel(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Private copy of the version in client:rpc.
|
|
||||||
private fun <T> Observable<T>.notUsed() {
|
|
||||||
try {
|
|
||||||
this.subscribe({}, {}).unsubscribe()
|
|
||||||
} catch (e: Exception) {
|
|
||||||
// Swallow any other exceptions as well.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -25,5 +25,3 @@ data class ServiceInfo(val type: ServiceType, val name: X500Name? = null) {
|
|||||||
|
|
||||||
override fun toString() = if (name != null) "$type|$name" else type.toString()
|
override fun toString() = if (name != null) "$type|$name" else type.toString()
|
||||||
}
|
}
|
||||||
|
|
||||||
fun Iterable<ServiceInfo>.containsType(type: ServiceType) = any { it.type == type }
|
|
||||||
|
@ -134,6 +134,7 @@ interface SerializationContext {
|
|||||||
* Helper method to return a new context based on this context with the appropriate class loader constructed from the passed attachment identifiers.
|
* Helper method to return a new context based on this context with the appropriate class loader constructed from the passed attachment identifiers.
|
||||||
* (Requires the attachment storage to have been enabled).
|
* (Requires the attachment storage to have been enabled).
|
||||||
*/
|
*/
|
||||||
|
@Throws(MissingAttachmentsException::class)
|
||||||
fun withAttachmentsClassLoader(attachmentHashes: List<SecureHash>): SerializationContext
|
fun withAttachmentsClassLoader(attachmentHashes: List<SecureHash>): SerializationContext
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -6,6 +6,7 @@ import net.corda.core.crypto.TransactionSignature
|
|||||||
import net.corda.core.utilities.toBase58String
|
import net.corda.core.utilities.toBase58String
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.node.ServiceHub
|
import net.corda.core.node.ServiceHub
|
||||||
|
import net.corda.core.serialization.CordaSerializable
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -13,6 +14,7 @@ import java.security.PublicKey
|
|||||||
* old and new notaries. Output states can be computed by applying the notary modification to corresponding inputs
|
* old and new notaries. Output states can be computed by applying the notary modification to corresponding inputs
|
||||||
* on the fly.
|
* on the fly.
|
||||||
*/
|
*/
|
||||||
|
@CordaSerializable
|
||||||
data class NotaryChangeWireTransaction(
|
data class NotaryChangeWireTransaction(
|
||||||
override val inputs: List<StateRef>,
|
override val inputs: List<StateRef>,
|
||||||
override val notary: Party,
|
override val notary: Party,
|
||||||
|
@ -41,14 +41,6 @@ class IdentitySyncFlowTests {
|
|||||||
val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name)
|
val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name)
|
||||||
val alice: Party = aliceNode.services.myInfo.legalIdentity
|
val alice: Party = aliceNode.services.myInfo.legalIdentity
|
||||||
val bob: Party = bobNode.services.myInfo.legalIdentity
|
val bob: Party = bobNode.services.myInfo.legalIdentity
|
||||||
aliceNode.database.transaction {
|
|
||||||
aliceNode.services.identityService.verifyAndRegisterIdentity(bobNode.info.legalIdentityAndCert)
|
|
||||||
aliceNode.services.identityService.verifyAndRegisterIdentity(notaryNode.info.legalIdentityAndCert)
|
|
||||||
}
|
|
||||||
bobNode.database.transaction {
|
|
||||||
bobNode.services.identityService.verifyAndRegisterIdentity(aliceNode.info.legalIdentityAndCert)
|
|
||||||
bobNode.services.identityService.verifyAndRegisterIdentity(notaryNode.info.legalIdentityAndCert)
|
|
||||||
}
|
|
||||||
bobNode.registerInitiatedFlow(Receive::class.java)
|
bobNode.registerInitiatedFlow(Receive::class.java)
|
||||||
|
|
||||||
// Alice issues then pays some cash to a new confidential identity that Bob doesn't know about
|
// Alice issues then pays some cash to a new confidential identity that Bob doesn't know about
|
||||||
|
@ -26,14 +26,6 @@ class TransactionKeyFlowTests {
|
|||||||
val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name)
|
val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name)
|
||||||
val alice: Party = aliceNode.services.myInfo.legalIdentity
|
val alice: Party = aliceNode.services.myInfo.legalIdentity
|
||||||
val bob: Party = bobNode.services.myInfo.legalIdentity
|
val bob: Party = bobNode.services.myInfo.legalIdentity
|
||||||
aliceNode.database.transaction {
|
|
||||||
aliceNode.services.identityService.verifyAndRegisterIdentity(bobNode.info.legalIdentityAndCert)
|
|
||||||
aliceNode.services.identityService.verifyAndRegisterIdentity(notaryNode.info.legalIdentityAndCert)
|
|
||||||
}
|
|
||||||
bobNode.database.transaction {
|
|
||||||
bobNode.services.identityService.verifyAndRegisterIdentity(aliceNode.info.legalIdentityAndCert)
|
|
||||||
bobNode.services.identityService.verifyAndRegisterIdentity(notaryNode.info.legalIdentityAndCert)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run the flows
|
// Run the flows
|
||||||
val requesterFlow = aliceNode.services.startFlow(TransactionKeyFlow(bob))
|
val requesterFlow = aliceNode.services.startFlow(TransactionKeyFlow(bob))
|
||||||
|
@ -10,6 +10,11 @@ the rest of the Corda libraries.
|
|||||||
currently known solution (such as publishing from buildSrc or setting up a separate project/repo) would
|
currently known solution (such as publishing from buildSrc or setting up a separate project/repo) would
|
||||||
introduce a two step build which is less convenient.
|
introduce a two step build which is less convenient.
|
||||||
|
|
||||||
|
Version number
|
||||||
|
--------------
|
||||||
|
|
||||||
|
To modify the version number edit constants.properties in root dir
|
||||||
|
|
||||||
Installing
|
Installing
|
||||||
----------
|
----------
|
||||||
|
|
||||||
|
@ -101,7 +101,9 @@ class Node extends CordformNode {
|
|||||||
protected void build() {
|
protected void build() {
|
||||||
configureRpcUsers()
|
configureRpcUsers()
|
||||||
installCordaJar()
|
installCordaJar()
|
||||||
installWebserverJar()
|
if (config.hasPath("webAddress")) {
|
||||||
|
installWebserverJar()
|
||||||
|
}
|
||||||
installBuiltPlugin()
|
installBuiltPlugin()
|
||||||
installCordapps()
|
installCordapps()
|
||||||
installConfig()
|
installConfig()
|
||||||
|
@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream
|
|||||||
import java.io.NotSerializableException
|
import java.io.NotSerializableException
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.ExecutionException
|
||||||
|
|
||||||
val attachmentsClassLoaderEnabledPropertyName = "attachments.class.loader.enabled"
|
val attachmentsClassLoaderEnabledPropertyName = "attachments.class.loader.enabled"
|
||||||
|
|
||||||
@ -42,19 +43,28 @@ data class SerializationContextImpl(override val preferredSerializationVersion:
|
|||||||
|
|
||||||
private val cache: Cache<List<SecureHash>, AttachmentsClassLoader> = CacheBuilder.newBuilder().weakValues().maximumSize(1024).build()
|
private val cache: Cache<List<SecureHash>, AttachmentsClassLoader> = CacheBuilder.newBuilder().weakValues().maximumSize(1024).build()
|
||||||
|
|
||||||
// We need to cache the AttachmentClassLoaders to avoid too many contexts, since the class loader is part of cache key for the context.
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* We need to cache the AttachmentClassLoaders to avoid too many contexts, since the class loader is part of cache key for the context.
|
||||||
|
*/
|
||||||
override fun withAttachmentsClassLoader(attachmentHashes: List<SecureHash>): SerializationContext {
|
override fun withAttachmentsClassLoader(attachmentHashes: List<SecureHash>): SerializationContext {
|
||||||
properties[attachmentsClassLoaderEnabledPropertyName] as? Boolean ?: false || return this
|
properties[attachmentsClassLoaderEnabledPropertyName] as? Boolean ?: false || return this
|
||||||
val serializationContext = properties[serializationContextKey] as? SerializeAsTokenContextImpl ?: return this // Some tests don't set one.
|
val serializationContext = properties[serializationContextKey] as? SerializeAsTokenContextImpl ?: return this // Some tests don't set one.
|
||||||
return withClassLoader(cache.get(attachmentHashes) {
|
try {
|
||||||
val missing = ArrayList<SecureHash>()
|
return withClassLoader(cache.get(attachmentHashes) {
|
||||||
val attachments = ArrayList<Attachment>()
|
val missing = ArrayList<SecureHash>()
|
||||||
attachmentHashes.forEach { id ->
|
val attachments = ArrayList<Attachment>()
|
||||||
serializationContext.serviceHub.attachments.openAttachment(id)?.let { attachments += it } ?: run { missing += id }
|
attachmentHashes.forEach { id ->
|
||||||
}
|
serializationContext.serviceHub.attachments.openAttachment(id)?.let { attachments += it } ?: run { missing += id }
|
||||||
missing.isNotEmpty() && throw MissingAttachmentsException(missing)
|
}
|
||||||
AttachmentsClassLoader(attachments)
|
missing.isNotEmpty() && throw MissingAttachmentsException(missing)
|
||||||
})
|
AttachmentsClassLoader(attachments)
|
||||||
|
})
|
||||||
|
} catch (e: ExecutionException) {
|
||||||
|
// Caught from within the cache get, so unwrap.
|
||||||
|
throw e.cause!!
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun withProperty(property: Any, value: Any): SerializationContext {
|
override fun withProperty(property: Any, value: Any): SerializationContext {
|
||||||
|
@ -15,7 +15,7 @@ import kotlin.reflect.jvm.javaConstructor
|
|||||||
open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPSerializer<Any> {
|
open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPSerializer<Any> {
|
||||||
override val type: Type get() = clazz
|
override val type: Type get() = clazz
|
||||||
open val kotlinConstructor = constructorForDeserialization(clazz)
|
open val kotlinConstructor = constructorForDeserialization(clazz)
|
||||||
val javaConstructor by lazy { kotlinConstructor?.javaConstructor?.apply { isAccessible = true } }
|
val javaConstructor by lazy { kotlinConstructor?.javaConstructor }
|
||||||
|
|
||||||
private val logger = loggerFor<ObjectSerializer>()
|
private val logger = loggerFor<ObjectSerializer>()
|
||||||
|
|
||||||
|
@ -13,6 +13,7 @@ import kotlin.reflect.KFunction
|
|||||||
import kotlin.reflect.KParameter
|
import kotlin.reflect.KParameter
|
||||||
import kotlin.reflect.full.findAnnotation
|
import kotlin.reflect.full.findAnnotation
|
||||||
import kotlin.reflect.full.primaryConstructor
|
import kotlin.reflect.full.primaryConstructor
|
||||||
|
import kotlin.reflect.jvm.isAccessible
|
||||||
import kotlin.reflect.jvm.javaType
|
import kotlin.reflect.jvm.javaType
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -48,7 +49,9 @@ internal fun constructorForDeserialization(type: Type): KFunction<Any>? {
|
|||||||
preferredCandidate = kotlinConstructor
|
preferredCandidate = kotlinConstructor
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return preferredCandidate ?: throw NotSerializableException("No constructor for deserialization found for $clazz.")
|
|
||||||
|
return preferredCandidate?.apply { isAccessible = true}
|
||||||
|
?: throw NotSerializableException("No constructor for deserialization found for $clazz.")
|
||||||
} else {
|
} else {
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package net.corda.nodeapi.internal.serialization.amqp.custom
|
|||||||
|
|
||||||
import net.corda.core.CordaRuntimeException
|
import net.corda.core.CordaRuntimeException
|
||||||
import net.corda.core.CordaThrowable
|
import net.corda.core.CordaThrowable
|
||||||
|
import net.corda.core.utilities.loggerFor
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
|
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.constructorForDeserialization
|
import net.corda.nodeapi.internal.serialization.amqp.constructorForDeserialization
|
||||||
@ -9,6 +10,11 @@ import net.corda.nodeapi.internal.serialization.amqp.propertiesForSerialization
|
|||||||
import java.io.NotSerializableException
|
import java.io.NotSerializableException
|
||||||
|
|
||||||
class ThrowableSerializer(factory: SerializerFactory) : CustomSerializer.Proxy<Throwable, ThrowableSerializer.ThrowableProxy>(Throwable::class.java, ThrowableProxy::class.java, factory) {
|
class ThrowableSerializer(factory: SerializerFactory) : CustomSerializer.Proxy<Throwable, ThrowableSerializer.ThrowableProxy>(Throwable::class.java, ThrowableProxy::class.java, factory) {
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
private val logger = loggerFor<ThrowableSerializer>()
|
||||||
|
}
|
||||||
|
|
||||||
override val additionalSerializers: Iterable<CustomSerializer<out Any>> = listOf(StackTraceElementSerializer(factory))
|
override val additionalSerializers: Iterable<CustomSerializer<out Any>> = listOf(StackTraceElementSerializer(factory))
|
||||||
|
|
||||||
override fun toProxy(obj: Throwable): ThrowableProxy {
|
override fun toProxy(obj: Throwable): ThrowableProxy {
|
||||||
@ -33,7 +39,7 @@ class ThrowableSerializer(factory: SerializerFactory) : CustomSerializer.Proxy<T
|
|||||||
override fun fromProxy(proxy: ThrowableProxy): Throwable {
|
override fun fromProxy(proxy: ThrowableProxy): Throwable {
|
||||||
try {
|
try {
|
||||||
// TODO: This will need reworking when we have multiple class loaders
|
// TODO: This will need reworking when we have multiple class loaders
|
||||||
val clazz = Class.forName(proxy.exceptionClass, false, this.javaClass.classLoader)
|
val clazz = Class.forName(proxy.exceptionClass, false, factory.classloader)
|
||||||
// If it is CordaException or CordaRuntimeException, we can seek any constructor and then set the properties
|
// If it is CordaException or CordaRuntimeException, we can seek any constructor and then set the properties
|
||||||
// Otherwise we just make a CordaRuntimeException
|
// Otherwise we just make a CordaRuntimeException
|
||||||
if (CordaThrowable::class.java.isAssignableFrom(clazz) && Throwable::class.java.isAssignableFrom(clazz)) {
|
if (CordaThrowable::class.java.isAssignableFrom(clazz) && Throwable::class.java.isAssignableFrom(clazz)) {
|
||||||
@ -50,7 +56,7 @@ class ThrowableSerializer(factory: SerializerFactory) : CustomSerializer.Proxy<T
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
// If attempts to rebuild the exact exception fail, we fall through and build a runtime exception.
|
logger.warn("Unexpected exception de-serializing throwable: ${proxy.exceptionClass}. Converting to CordaRuntimeException.", e)
|
||||||
}
|
}
|
||||||
// If the criteria are not met or we experience an exception constructing the exception, we fall back to our own unchecked exception.
|
// If the criteria are not met or we experience an exception constructing the exception, we fall back to our own unchecked exception.
|
||||||
return CordaRuntimeException(proxy.exceptionClass).apply {
|
return CordaRuntimeException(proxy.exceptionClass).apply {
|
||||||
|
@ -10,7 +10,6 @@ import net.corda.core.internal.declaredField
|
|||||||
import net.corda.core.node.ServiceHub
|
import net.corda.core.node.ServiceHub
|
||||||
import net.corda.core.node.services.AttachmentStorage
|
import net.corda.core.node.services.AttachmentStorage
|
||||||
import net.corda.core.serialization.*
|
import net.corda.core.serialization.*
|
||||||
import net.corda.core.serialization.SerializationFactory
|
|
||||||
import net.corda.core.transactions.LedgerTransaction
|
import net.corda.core.transactions.LedgerTransaction
|
||||||
import net.corda.core.transactions.TransactionBuilder
|
import net.corda.core.transactions.TransactionBuilder
|
||||||
import net.corda.core.utilities.ByteSequence
|
import net.corda.core.utilities.ByteSequence
|
||||||
@ -377,4 +376,24 @@ class AttachmentClassLoaderTests : TestDependencyInjectionBase() {
|
|||||||
// Then deserialize with the attachment class loader associated with the attachment
|
// Then deserialize with the attachment class loader associated with the attachment
|
||||||
serialized.deserialize(context = inboundContext)
|
serialized.deserialize(context = inboundContext)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `test loading a class with attachment missing during deserialization`() {
|
||||||
|
val child = ClassLoaderForTests()
|
||||||
|
val contractClass = Class.forName("net.corda.contracts.isolated.AnotherDummyContract", true, child)
|
||||||
|
val contract = contractClass.newInstance() as DummyContractBackdoor
|
||||||
|
val storage = MockAttachmentStorage()
|
||||||
|
val attachmentRef = SecureHash.randomSHA256()
|
||||||
|
val outboundContext = SerializationFactory.defaultFactory.defaultContext.withClassLoader(child)
|
||||||
|
// Serialize with custom context to avoid populating the default context with the specially loaded class
|
||||||
|
val serialized = contract.serialize(context = outboundContext)
|
||||||
|
|
||||||
|
// Then deserialize with the attachment class loader associated with the attachment
|
||||||
|
val e = assertFailsWith(MissingAttachmentsException::class) {
|
||||||
|
// We currently ignore annotations in attachments, so manually whitelist.
|
||||||
|
val inboundContext = SerializationFactory.defaultFactory.defaultContext.withWhitelisted(contract.javaClass).withAttachmentStorage(storage).withAttachmentsClassLoader(listOf(attachmentRef))
|
||||||
|
serialized.deserialize(context = inboundContext)
|
||||||
|
}
|
||||||
|
assertEquals(attachmentRef, e.ids.single())
|
||||||
|
}
|
||||||
}
|
}
|
@ -55,6 +55,7 @@ import net.corda.node.services.schema.HibernateObserver
|
|||||||
import net.corda.node.services.schema.NodeSchemaService
|
import net.corda.node.services.schema.NodeSchemaService
|
||||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||||
import net.corda.node.services.statemachine.StateMachineManager
|
import net.corda.node.services.statemachine.StateMachineManager
|
||||||
|
import net.corda.node.services.statemachine.appName
|
||||||
import net.corda.node.services.statemachine.flowVersionAndInitiatingClass
|
import net.corda.node.services.statemachine.flowVersionAndInitiatingClass
|
||||||
import net.corda.node.services.transactions.*
|
import net.corda.node.services.transactions.*
|
||||||
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
|
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
|
||||||
@ -349,13 +350,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
|||||||
require(classWithAnnotation == initiatingFlow) {
|
require(classWithAnnotation == initiatingFlow) {
|
||||||
"${InitiatedBy::class.java.name} must point to ${classWithAnnotation.name} and not ${initiatingFlow.name}"
|
"${InitiatedBy::class.java.name} must point to ${classWithAnnotation.name} and not ${initiatingFlow.name}"
|
||||||
}
|
}
|
||||||
val jarFile = Paths.get(initiatedFlow.protectionDomain.codeSource.location.toURI())
|
val flowFactory = InitiatedFlowFactory.CorDapp(version, initiatedFlow.appName, { ctor.newInstance(it) })
|
||||||
val appName = if (jarFile.isRegularFile() && jarFile.toString().endsWith(".jar")) {
|
|
||||||
jarFile.fileName.toString().removeSuffix(".jar")
|
|
||||||
} else {
|
|
||||||
"<unknown>"
|
|
||||||
}
|
|
||||||
val flowFactory = InitiatedFlowFactory.CorDapp(version, appName, { ctor.newInstance(it) })
|
|
||||||
val observable = internalRegisterFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track)
|
val observable = internalRegisterFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track)
|
||||||
log.info("Registered ${initiatingFlow.name} to initiate ${initiatedFlow.name} (version $version)")
|
log.info("Registered ${initiatingFlow.name} to initiate ${initiatedFlow.name} (version $version)")
|
||||||
return observable
|
return observable
|
||||||
|
@ -15,6 +15,7 @@ import net.corda.core.internal.FlowStateMachine
|
|||||||
import net.corda.core.internal.abbreviate
|
import net.corda.core.internal.abbreviate
|
||||||
import net.corda.core.internal.concurrent.OpenFuture
|
import net.corda.core.internal.concurrent.OpenFuture
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
|
import net.corda.core.internal.isRegularFile
|
||||||
import net.corda.core.internal.staticField
|
import net.corda.core.internal.staticField
|
||||||
import net.corda.core.transactions.SignedTransaction
|
import net.corda.core.transactions.SignedTransaction
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.*
|
||||||
@ -27,6 +28,7 @@ import net.corda.node.utilities.DatabaseTransaction
|
|||||||
import net.corda.node.utilities.DatabaseTransactionManager
|
import net.corda.node.utilities.DatabaseTransactionManager
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
import java.nio.file.Paths
|
||||||
import java.sql.SQLException
|
import java.sql.SQLException
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
@ -347,7 +349,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty), retryable)
|
val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty), retryable)
|
||||||
openSessions[Pair(sessionFlow, otherParty)] = session
|
openSessions[Pair(sessionFlow, otherParty)] = session
|
||||||
val (version, initiatingFlowClass) = sessionFlow.javaClass.flowVersionAndInitiatingClass
|
val (version, initiatingFlowClass) = sessionFlow.javaClass.flowVersionAndInitiatingClass
|
||||||
val sessionInit = SessionInit(session.ourSessionId, initiatingFlowClass.name, version, "not defined", firstPayload)
|
val sessionInit = SessionInit(session.ourSessionId, initiatingFlowClass.name, version, sessionFlow.javaClass.appName, firstPayload)
|
||||||
sendInternal(session, sessionInit)
|
sendInternal(session, sessionInit)
|
||||||
if (waitForConfirmation) {
|
if (waitForConfirmation) {
|
||||||
session.waitForConfirmation()
|
session.waitForConfirmation()
|
||||||
@ -491,3 +493,12 @@ val Class<out FlowLogic<*>>.flowVersionAndInitiatingClass: Pair<Int, Class<out F
|
|||||||
"${InitiatingFlow::class.java.name}. See https://docs.corda.net/api-flows.html#flowlogic-annotations.")
|
"${InitiatingFlow::class.java.name}. See https://docs.corda.net/api-flows.html#flowlogic-annotations.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val Class<out FlowLogic<*>>.appName: String get() {
|
||||||
|
val jarFile = Paths.get(protectionDomain.codeSource.location.toURI())
|
||||||
|
return if (jarFile.isRegularFile() && jarFile.toString().endsWith(".jar")) {
|
||||||
|
jarFile.fileName.toString().removeSuffix(".jar")
|
||||||
|
} else {
|
||||||
|
"<unknown>"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -40,10 +40,10 @@ import net.corda.node.utilities.wrapWithDatabaseTransaction
|
|||||||
import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl
|
import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl
|
||||||
import net.corda.nodeapi.internal.serialization.withTokenContext
|
import net.corda.nodeapi.internal.serialization.withTokenContext
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||||
import org.bouncycastle.asn1.x500.X500Name
|
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
|
import java.io.NotSerializableException
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
@ -609,13 +609,20 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
|
|
||||||
val serialized = try {
|
val serialized = try {
|
||||||
message.serialize()
|
message.serialize()
|
||||||
} catch (e: KryoException) {
|
} catch (e: Exception) {
|
||||||
if (message !is ErrorSessionEnd || message.errorResponse == null) throw e
|
when(e) {
|
||||||
logger.warn("Something in ${message.errorResponse.javaClass.name} is not serialisable. " +
|
// Handling Kryo and AMQP serialization problems. Unfortunately the two exception types do not share much of a common exception interface.
|
||||||
"Instead sending back an exception which is serialisable to ensure session end occurs properly.", e)
|
is KryoException,
|
||||||
// The subclass may have overridden toString so we use that
|
is NotSerializableException -> {
|
||||||
val exMessage = message.errorResponse.let { if (it.javaClass != FlowException::class.java) it.toString() else it.message }
|
if (message !is ErrorSessionEnd || message.errorResponse == null) throw e
|
||||||
message.copy(errorResponse = FlowException(exMessage)).serialize()
|
logger.warn("Something in ${message.errorResponse.javaClass.name} is not serialisable. " +
|
||||||
|
"Instead sending back an exception which is serialisable to ensure session end occurs properly.", e)
|
||||||
|
// The subclass may have overridden toString so we use that
|
||||||
|
val exMessage = message.errorResponse.let { if (it.javaClass != FlowException::class.java) it.toString() else it.message }
|
||||||
|
message.copy(errorResponse = FlowException(exMessage)).serialize()
|
||||||
|
}
|
||||||
|
else -> throw e
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceHub.networkService.apply {
|
serviceHub.networkService.apply {
|
||||||
|
@ -10,6 +10,7 @@ import net.corda.core.internal.list
|
|||||||
import net.corda.core.messaging.startFlow
|
import net.corda.core.messaging.startFlow
|
||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
import net.corda.core.utilities.getX500Name
|
import net.corda.core.utilities.getX500Name
|
||||||
|
import net.corda.core.utilities.unwrap
|
||||||
import net.corda.nodeapi.User
|
import net.corda.nodeapi.User
|
||||||
import net.corda.smoketesting.NodeConfig
|
import net.corda.smoketesting.NodeConfig
|
||||||
import net.corda.smoketesting.NodeProcess
|
import net.corda.smoketesting.NodeProcess
|
||||||
@ -40,16 +41,19 @@ class CordappSmokeTest {
|
|||||||
fun `FlowContent appName returns the filename of the CorDapp jar`() {
|
fun `FlowContent appName returns the filename of the CorDapp jar`() {
|
||||||
val pluginsDir = (factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
|
val pluginsDir = (factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
|
||||||
// Find the jar file for the smoke tests of this module
|
// Find the jar file for the smoke tests of this module
|
||||||
val selfCorDapp = Paths.get("build", "libs").list {
|
val selfCordapp = Paths.get("build", "libs").list {
|
||||||
it.filter { "-smoke-test" in it.toString() }.toList().single()
|
it.filter { "-smoke-test" in it.toString() }.toList().single()
|
||||||
}
|
}
|
||||||
selfCorDapp.copyToDirectory(pluginsDir)
|
selfCordapp.copyToDirectory(pluginsDir)
|
||||||
|
|
||||||
factory.create(aliceConfig).use { alice ->
|
factory.create(aliceConfig).use { alice ->
|
||||||
alice.connect().use { connectionToAlice ->
|
alice.connect().use { connectionToAlice ->
|
||||||
val aliceIdentity = connectionToAlice.proxy.nodeIdentity().legalIdentity
|
val aliceIdentity = connectionToAlice.proxy.nodeIdentity().legalIdentity
|
||||||
val future = connectionToAlice.proxy.startFlow(::DummyInitiatingFlow, aliceIdentity).returnValue
|
val future = connectionToAlice.proxy.startFlow(::GatherContextsFlow, aliceIdentity).returnValue
|
||||||
assertThat(future.getOrThrow().appName).isEqualTo(selfCorDapp.fileName.toString().removeSuffix(".jar"))
|
val (sessionInitContext, sessionConfirmContext) = future.getOrThrow()
|
||||||
|
val selfCordappName = selfCordapp.fileName.toString().removeSuffix(".jar")
|
||||||
|
assertThat(sessionInitContext.appName).isEqualTo(selfCordappName)
|
||||||
|
assertThat(sessionConfirmContext.appName).isEqualTo(selfCordappName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -62,15 +66,26 @@ class CordappSmokeTest {
|
|||||||
|
|
||||||
@InitiatingFlow
|
@InitiatingFlow
|
||||||
@StartableByRPC
|
@StartableByRPC
|
||||||
class DummyInitiatingFlow(val otherParty: Party) : FlowLogic<FlowContext>() {
|
class GatherContextsFlow(private val otherParty: Party) : FlowLogic<Pair<FlowContext, FlowContext>>() {
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun call() = getFlowContext(otherParty)
|
override fun call(): Pair<FlowContext, FlowContext> {
|
||||||
|
// This receive will kick off SendBackInitiatorFlowContext by sending a session-init with our app name.
|
||||||
|
// SendBackInitiatorFlowContext will send back our context using the information from this session-init
|
||||||
|
val sessionInitContext = receive<FlowContext>(otherParty).unwrap { it }
|
||||||
|
// This context is taken from the session-confirm message
|
||||||
|
val sessionConfirmContext = getFlowContext(otherParty)
|
||||||
|
return Pair(sessionInitContext, sessionConfirmContext)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Suppress("unused")
|
@Suppress("unused")
|
||||||
@InitiatedBy(DummyInitiatingFlow::class)
|
@InitiatedBy(GatherContextsFlow::class)
|
||||||
class DummyInitiatedFlow(val otherParty: Party) : FlowLogic<Unit>() {
|
class SendBackInitiatorFlowContext(private val otherParty: Party) : FlowLogic<Unit>() {
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun call() = Unit
|
override fun call() {
|
||||||
|
// An initiated flow calling getFlowContext on its initiator will get the context from the session-init
|
||||||
|
val sessionInitContext = getFlowContext(otherParty)
|
||||||
|
send(otherParty, sessionInitContext)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -122,7 +122,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
|||||||
resetTestSerialization()
|
resetTestSerialization()
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestState(val flowLogicRef: FlowLogicRef, val instant: Instant, private val myIdentity: Party) : LinearState, SchedulableState {
|
class TestState(val flowLogicRef: FlowLogicRef, val instant: Instant, val myIdentity: Party) : LinearState, SchedulableState {
|
||||||
override val participants: List<AbstractParty>
|
override val participants: List<AbstractParty>
|
||||||
get() = listOf(myIdentity)
|
get() = listOf(myIdentity)
|
||||||
|
|
||||||
|
@ -77,9 +77,9 @@ class IRSDemoTest : IntegrationTestCategory {
|
|||||||
assertThat(getFloatingLegFixCount(nodeAApi) == 0)
|
assertThat(getFloatingLegFixCount(nodeAApi) == 0)
|
||||||
|
|
||||||
// Wait until the initial trade and all scheduled fixings up to the current date have finished
|
// Wait until the initial trade and all scheduled fixings up to the current date have finished
|
||||||
nextFixingDates.firstWithTimeout(maxWaitTime){ it == null || it > currentDate }
|
nextFixingDates.firstWithTimeout(maxWaitTime) { it == null || it >= currentDate }
|
||||||
runDateChange(nodeBApi)
|
runDateChange(nodeBApi)
|
||||||
nextFixingDates.firstWithTimeout(maxWaitTime) { it == null || it > futureDate }
|
nextFixingDates.firstWithTimeout(maxWaitTime) { it == null || it >= futureDate }
|
||||||
|
|
||||||
assertThat(getFloatingLegFixCount(nodeAApi) > 0)
|
assertThat(getFloatingLegFixCount(nodeAApi) > 0)
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,7 @@ import net.corda.core.messaging.SingleMessageRecipient
|
|||||||
import net.corda.core.node.CityDatabase
|
import net.corda.core.node.CityDatabase
|
||||||
import net.corda.core.node.WorldMapLocation
|
import net.corda.core.node.WorldMapLocation
|
||||||
import net.corda.core.node.services.ServiceInfo
|
import net.corda.core.node.services.ServiceInfo
|
||||||
import net.corda.core.node.services.containsType
|
import net.corda.core.node.services.ServiceType
|
||||||
import net.corda.core.utilities.ProgressTracker
|
import net.corda.core.utilities.ProgressTracker
|
||||||
import net.corda.core.utilities.getX500Name
|
import net.corda.core.utilities.getX500Name
|
||||||
import net.corda.core.utilities.locality
|
import net.corda.core.utilities.locality
|
||||||
@ -278,3 +278,9 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
|
|||||||
mockNet.stopNodes()
|
mockNet.stopNodes()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper function for verifying that a service info contains the given type of advertised service. For non-simulation cases
|
||||||
|
* this is a configuration matter rather than implementation.
|
||||||
|
*/
|
||||||
|
fun Iterable<ServiceInfo>.containsType(type: ServiceType) = any { it.type == type }
|
@ -1,9 +1,6 @@
|
|||||||
package net.corda.testing
|
package net.corda.testing
|
||||||
|
|
||||||
import net.corda.client.mock.Generator
|
import net.corda.client.mock.Generator
|
||||||
import net.corda.client.mock.generateOrFail
|
|
||||||
import net.corda.client.mock.int
|
|
||||||
import net.corda.client.mock.string
|
|
||||||
import net.corda.client.rpc.CordaRPCClient
|
import net.corda.client.rpc.CordaRPCClient
|
||||||
import net.corda.client.rpc.internal.RPCClient
|
import net.corda.client.rpc.internal.RPCClient
|
||||||
import net.corda.client.rpc.internal.RPCClientConfiguration
|
import net.corda.client.rpc.internal.RPCClientConfiguration
|
||||||
|
@ -4,7 +4,6 @@ import joptsimple.OptionSet
|
|||||||
import net.corda.client.mock.ErrorFlowsEventGenerator
|
import net.corda.client.mock.ErrorFlowsEventGenerator
|
||||||
import net.corda.client.mock.EventGenerator
|
import net.corda.client.mock.EventGenerator
|
||||||
import net.corda.client.mock.Generator
|
import net.corda.client.mock.Generator
|
||||||
import net.corda.client.mock.pickOne
|
|
||||||
import net.corda.client.rpc.CordaRPCConnection
|
import net.corda.client.rpc.CordaRPCConnection
|
||||||
import net.corda.core.contracts.Amount
|
import net.corda.core.contracts.Amount
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package net.corda.loadtest.tests
|
package net.corda.loadtest.tests
|
||||||
|
|
||||||
import net.corda.client.mock.Generator
|
import net.corda.client.mock.Generator
|
||||||
import net.corda.client.mock.pickN
|
|
||||||
import net.corda.core.contracts.Issued
|
import net.corda.core.contracts.Issued
|
||||||
import net.corda.core.contracts.PartyAndReference
|
import net.corda.core.contracts.PartyAndReference
|
||||||
import net.corda.core.identity.AbstractParty
|
import net.corda.core.identity.AbstractParty
|
||||||
|
@ -2,7 +2,6 @@ package net.corda.loadtest.tests
|
|||||||
|
|
||||||
import net.corda.client.mock.Generator
|
import net.corda.client.mock.Generator
|
||||||
import net.corda.client.mock.generateAmount
|
import net.corda.client.mock.generateAmount
|
||||||
import net.corda.client.mock.pickOne
|
|
||||||
import net.corda.core.contracts.Issued
|
import net.corda.core.contracts.Issued
|
||||||
import net.corda.core.contracts.PartyAndReference
|
import net.corda.core.contracts.PartyAndReference
|
||||||
import net.corda.core.contracts.withoutIssuer
|
import net.corda.core.contracts.withoutIssuer
|
||||||
|
@ -1,9 +1,6 @@
|
|||||||
package net.corda.loadtest.tests
|
package net.corda.loadtest.tests
|
||||||
|
|
||||||
import net.corda.client.mock.Generator
|
import net.corda.client.mock.Generator
|
||||||
import net.corda.client.mock.int
|
|
||||||
import net.corda.client.mock.pickOne
|
|
||||||
import net.corda.client.mock.replicate
|
|
||||||
import net.corda.core.flows.FinalityFlow
|
import net.corda.core.flows.FinalityFlow
|
||||||
import net.corda.core.flows.FlowException
|
import net.corda.core.flows.FlowException
|
||||||
import net.corda.core.internal.concurrent.thenMatch
|
import net.corda.core.internal.concurrent.thenMatch
|
||||||
|
@ -2,8 +2,6 @@ package net.corda.loadtest.tests
|
|||||||
|
|
||||||
import de.danielbechler.diff.ObjectDifferFactory
|
import de.danielbechler.diff.ObjectDifferFactory
|
||||||
import net.corda.client.mock.Generator
|
import net.corda.client.mock.Generator
|
||||||
import net.corda.client.mock.pickOne
|
|
||||||
import net.corda.client.mock.replicatePoisson
|
|
||||||
import net.corda.finance.contracts.asset.Cash
|
import net.corda.finance.contracts.asset.Cash
|
||||||
import net.corda.core.flows.FlowException
|
import net.corda.core.flows.FlowException
|
||||||
import net.corda.core.identity.AbstractParty
|
import net.corda.core.identity.AbstractParty
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
package net.corda.verifier
|
package net.corda.verifier
|
||||||
|
|
||||||
import net.corda.client.mock.generateOrFail
|
|
||||||
import net.corda.core.internal.concurrent.map
|
import net.corda.core.internal.concurrent.map
|
||||||
import net.corda.core.internal.concurrent.transpose
|
import net.corda.core.internal.concurrent.transpose
|
||||||
import net.corda.core.messaging.startFlow
|
import net.corda.core.messaging.startFlow
|
||||||
|
Loading…
Reference in New Issue
Block a user