mirror of
https://github.com/corda/corda.git
synced 2025-04-21 17:42:05 +00:00
Merge pull request #1598 from corda/parkri-os-merge-20181126-1
This commit is contained in:
commit
96205b26c4
@ -2271,7 +2271,7 @@ public final class net.corda.core.flows.NotaryFlow extends java.lang.Object
|
||||
##
|
||||
@DoNotImplement
|
||||
@InitiatingFlow
|
||||
public static class net.corda.core.flows.NotaryFlow$Client extends net.corda.core.flows.FlowLogic implements net.corda.core.internal.TimedFlow
|
||||
public static class net.corda.core.flows.NotaryFlow$Client extends net.corda.core.internal.BackpressureAwareTimedFlow
|
||||
public <init>(net.corda.core.transactions.SignedTransaction)
|
||||
public <init>(net.corda.core.transactions.SignedTransaction, net.corda.core.utilities.ProgressTracker)
|
||||
@Suspendable
|
||||
|
@ -1,10 +1,6 @@
|
||||
package net.corda.common.configuration.parsing.internal
|
||||
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigException
|
||||
import com.typesafe.config.ConfigObject
|
||||
import com.typesafe.config.ConfigValue
|
||||
import com.typesafe.config.ConfigValueFactory
|
||||
import com.typesafe.config.*
|
||||
import net.corda.common.configuration.parsing.internal.versioned.VersionExtractor
|
||||
import net.corda.common.validation.internal.Validated
|
||||
import net.corda.common.validation.internal.Validated.Companion.invalid
|
||||
@ -25,7 +21,7 @@ object Configuration {
|
||||
/**
|
||||
* Describes a [Config] hiding sensitive data.
|
||||
*/
|
||||
fun describe(configuration: Config, serialiseValue: (Any) -> ConfigValue = { value -> ConfigValueFactory.fromAnyRef(value.toString()) }): ConfigValue?
|
||||
fun describe(configuration: Config, serialiseValue: (Any?) -> ConfigValue = { value -> ConfigValueFactory.fromAnyRef(value.toString()) }): ConfigValue?
|
||||
}
|
||||
|
||||
object Value {
|
||||
@ -131,6 +127,22 @@ object Configuration {
|
||||
fun optional(): Optional<TYPE>
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a required property with a collection of values.
|
||||
*/
|
||||
interface RequiredList<TYPE> : Required<List<TYPE>> {
|
||||
|
||||
/**
|
||||
* Passes the value to a validating mapping function, provided this is valid in the first place.
|
||||
*/
|
||||
fun <MAPPED> mapValid(mappedTypeName: String, convert: (List<TYPE>) -> Validated<MAPPED, Validation.Error>): Required<MAPPED>
|
||||
|
||||
/**
|
||||
* Passes the value to a non-validating mapping function, provided this is valid in the first place.
|
||||
*/
|
||||
fun <MAPPED> map(mappedTypeName: String, convert: (List<TYPE>) -> MAPPED): Required<MAPPED> = mapValid(mappedTypeName) { value -> valid(convert.invoke(value)) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a property that must provide a single value or produce an error in case multiple values are specified for the relevant key.
|
||||
*/
|
||||
@ -139,7 +151,7 @@ object Configuration {
|
||||
/**
|
||||
* Returns a required property expecting multiple values for the relevant key.
|
||||
*/
|
||||
fun list(): Required<List<TYPE>>
|
||||
fun list(): RequiredList<TYPE>
|
||||
}
|
||||
|
||||
/**
|
||||
@ -161,12 +173,12 @@ object Configuration {
|
||||
/**
|
||||
* Passes the value to a validating mapping function, provided this is valid in the first place.
|
||||
*/
|
||||
fun <MAPPED : Any> mapValid(mappedTypeName: String, convert: (TYPE) -> Validated<MAPPED, Validation.Error>): Standard<MAPPED>
|
||||
fun <MAPPED> mapValid(mappedTypeName: String, convert: (TYPE) -> Validated<MAPPED, Validation.Error>): Standard<MAPPED>
|
||||
|
||||
/**
|
||||
* Passes the value to a non-validating mapping function, provided this is valid in the first place.
|
||||
*/
|
||||
fun <MAPPED : Any> map(mappedTypeName: String, convert: (TYPE) -> MAPPED): Standard<MAPPED> = mapValid(mappedTypeName) { value -> valid(convert.invoke(value)) }
|
||||
fun <MAPPED> map(mappedTypeName: String, convert: (TYPE) -> MAPPED): Standard<MAPPED> = mapValid(mappedTypeName) { value -> valid(convert.invoke(value)) }
|
||||
}
|
||||
|
||||
override fun parse(configuration: Config, options: Configuration.Validation.Options): Validated<TYPE, Validation.Error> {
|
||||
@ -268,7 +280,7 @@ object Configuration {
|
||||
*/
|
||||
fun validate(target: Config): Valid<Config> = validate(target, Configuration.Validation.Options.defaults)
|
||||
|
||||
override fun describe(configuration: Config, serialiseValue: (Any) -> ConfigValue): ConfigValue
|
||||
override fun describe(configuration: Config, serialiseValue: (Any?) -> ConfigValue): ConfigValue
|
||||
|
||||
companion object {
|
||||
|
||||
@ -358,7 +370,7 @@ object Configuration {
|
||||
|
||||
override fun validate(target: Config, options: Validation.Options) = schema.validate(target, options)
|
||||
|
||||
override fun describe(configuration: Config, serialiseValue: (Any) -> ConfigValue) = schema.describe(configuration, serialiseValue)
|
||||
override fun describe(configuration: Config, serialiseValue: (Any?) -> ConfigValue) = schema.describe(configuration, serialiseValue)
|
||||
|
||||
final override fun parse(configuration: Config, options: Configuration.Validation.Options): Valid<VALUE> = validate(configuration, options).mapValid(::parseValid)
|
||||
|
||||
|
@ -1,10 +1,6 @@
|
||||
package net.corda.common.configuration.parsing.internal
|
||||
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigException
|
||||
import com.typesafe.config.ConfigObject
|
||||
import com.typesafe.config.ConfigValue
|
||||
import com.typesafe.config.ConfigValueFactory
|
||||
import com.typesafe.config.*
|
||||
import net.corda.common.validation.internal.Validated
|
||||
import net.corda.common.validation.internal.Validated.Companion.invalid
|
||||
import net.corda.common.validation.internal.Validated.Companion.valid
|
||||
@ -27,13 +23,13 @@ internal open class StandardProperty<TYPE : Any>(override val key: String, typeN
|
||||
|
||||
override val typeName: String = schema?.let { "#${it.name ?: "Object@$key"}" } ?: typeNameArg
|
||||
|
||||
override fun <MAPPED : Any> mapValid(mappedTypeName: String, convert: (TYPE) -> Valid<MAPPED>): Configuration.Property.Definition.Standard<MAPPED> = FunctionalProperty(this, mappedTypeName, extractListValue, convert)
|
||||
override fun <MAPPED> mapValid(mappedTypeName: String, convert: (TYPE) -> Valid<MAPPED>): Configuration.Property.Definition.Standard<MAPPED> = FunctionalProperty(this, mappedTypeName, extractListValue, convert)
|
||||
|
||||
override fun optional(): Configuration.Property.Definition.Optional<TYPE> = OptionalDelegatedProperty(this)
|
||||
|
||||
override fun list(): Configuration.Property.Definition.Required<List<TYPE>> = ListProperty(this)
|
||||
override fun list(): Configuration.Property.Definition.RequiredList<TYPE> = ListProperty(this)
|
||||
|
||||
override fun describe(configuration: Config, serialiseValue: (Any) -> ConfigValue): ConfigValue {
|
||||
override fun describe(configuration: Config, serialiseValue: (Any?) -> ConfigValue): ConfigValue {
|
||||
|
||||
if (isSensitive) {
|
||||
return valueDescription(Configuration.Property.Definition.SENSITIVE_DATA_PLACEHOLDER, serialiseValue)
|
||||
@ -61,7 +57,7 @@ internal open class StandardProperty<TYPE : Any>(override val key: String, typeN
|
||||
override fun toString() = "\"$key\": \"$typeName\""
|
||||
}
|
||||
|
||||
private class ListProperty<TYPE : Any>(delegate: StandardProperty<TYPE>) : RequiredDelegatedProperty<List<TYPE>, StandardProperty<TYPE>>(delegate) {
|
||||
private class ListProperty<TYPE : Any>(delegate: StandardProperty<TYPE>) : RequiredDelegatedProperty<List<TYPE>, StandardProperty<TYPE>>(delegate), Configuration.Property.Definition.RequiredList<TYPE> {
|
||||
|
||||
override val typeName: String = "List<${delegate.typeName}>"
|
||||
|
||||
@ -79,7 +75,9 @@ private class ListProperty<TYPE : Any>(delegate: StandardProperty<TYPE>) : Requi
|
||||
return Validated.withResult(target, errors)
|
||||
}
|
||||
|
||||
override fun describe(configuration: Config, serialiseValue: (Any) -> ConfigValue): ConfigValue {
|
||||
override fun <MAPPED> mapValid(mappedTypeName: String, convert: (List<TYPE>) -> Validated<MAPPED, Configuration.Validation.Error>): Configuration.Property.Definition.Required<MAPPED> = ListMappingProperty(this, mappedTypeName, convert)
|
||||
|
||||
override fun describe(configuration: Config, serialiseValue: (Any?) -> ConfigValue): ConfigValue {
|
||||
|
||||
if (isSensitive) {
|
||||
return valueDescription(Configuration.Property.Definition.SENSITIVE_DATA_PLACEHOLDER, serialiseValue)
|
||||
@ -102,28 +100,28 @@ private class ListProperty<TYPE : Any>(delegate: StandardProperty<TYPE>) : Requi
|
||||
}
|
||||
}
|
||||
|
||||
private class OptionalPropertyWithDefault<TYPE : Any>(delegate: Configuration.Property.Definition.Optional<TYPE>, private val defaultValue: TYPE) : DelegatedProperty<TYPE, Configuration.Property.Definition.Optional<TYPE>>(delegate) {
|
||||
private class OptionalPropertyWithDefault<TYPE>(delegate: Configuration.Property.Definition.Optional<TYPE>, private val defaultValue: TYPE) : DelegatedProperty<TYPE, Configuration.Property.Definition.Optional<TYPE>>(delegate) {
|
||||
|
||||
override val isMandatory: Boolean = false
|
||||
|
||||
override val typeName: String = delegate.typeName.removeSuffix("?")
|
||||
|
||||
override fun describe(configuration: Config, serialiseValue: (Any) -> ConfigValue): ConfigValue? = delegate.describe(configuration, serialiseValue) ?: valueDescription(if (isSensitive) Configuration.Property.Definition.SENSITIVE_DATA_PLACEHOLDER else defaultValue, serialiseValue)
|
||||
override fun describe(configuration: Config, serialiseValue: (Any?) -> ConfigValue): ConfigValue? = delegate.describe(configuration, serialiseValue) ?: valueDescription(if (isSensitive) Configuration.Property.Definition.SENSITIVE_DATA_PLACEHOLDER else defaultValue, serialiseValue)
|
||||
|
||||
override fun valueIn(configuration: Config): TYPE = delegate.valueIn(configuration) ?: defaultValue
|
||||
|
||||
override fun validate(target: Config, options: Configuration.Validation.Options): Valid<Config> = delegate.validate(target, options)
|
||||
}
|
||||
|
||||
private class FunctionalProperty<TYPE, MAPPED : Any>(delegate: Configuration.Property.Definition.Standard<TYPE>, private val mappedTypeName: String, internal val extractListValue: (Config, String) -> List<TYPE>, private val convert: (TYPE) -> Valid<MAPPED>) : RequiredDelegatedProperty<MAPPED, Configuration.Property.Definition.Standard<TYPE>>(delegate), Configuration.Property.Definition.Standard<MAPPED> {
|
||||
private class FunctionalProperty<TYPE, MAPPED>(delegate: Configuration.Property.Definition.Standard<TYPE>, private val mappedTypeName: String, internal val extractListValue: (Config, String) -> List<TYPE>, private val convert: (TYPE) -> Valid<MAPPED>) : RequiredDelegatedProperty<MAPPED, Configuration.Property.Definition.Standard<TYPE>>(delegate), Configuration.Property.Definition.Standard<MAPPED> {
|
||||
|
||||
override fun valueIn(configuration: Config) = convert.invoke(delegate.valueIn(configuration)).value()
|
||||
|
||||
override val typeName: String = if (super.typeName == "#$mappedTypeName") super.typeName else "$mappedTypeName(${super.typeName})"
|
||||
|
||||
override fun <M : Any> mapValid(mappedTypeName: String, convert: (MAPPED) -> Valid<M>): Configuration.Property.Definition.Standard<M> = FunctionalProperty(delegate, mappedTypeName, extractListValue, { target: TYPE -> this.convert.invoke(target).mapValid(convert) })
|
||||
override fun <M> mapValid(mappedTypeName: String, convert: (MAPPED) -> Valid<M>): Configuration.Property.Definition.Standard<M> = FunctionalProperty(delegate, mappedTypeName, extractListValue, { target: TYPE -> this.convert.invoke(target).mapValid(convert) })
|
||||
|
||||
override fun list(): Configuration.Property.Definition.Required<List<MAPPED>> = FunctionalListProperty(this)
|
||||
override fun list(): Configuration.Property.Definition.RequiredList<MAPPED> = FunctionalListProperty(this)
|
||||
|
||||
override fun validate(target: Config, options: Configuration.Validation.Options): Valid<Config> {
|
||||
|
||||
@ -135,10 +133,10 @@ private class FunctionalProperty<TYPE, MAPPED : Any>(delegate: Configuration.Pro
|
||||
return Validated.withResult(target, errors)
|
||||
}
|
||||
|
||||
override fun describe(configuration: Config, serialiseValue: (Any) -> ConfigValue) = delegate.describe(configuration, serialiseValue)
|
||||
override fun describe(configuration: Config, serialiseValue: (Any?) -> ConfigValue) = delegate.describe(configuration, serialiseValue)
|
||||
}
|
||||
|
||||
private class FunctionalListProperty<RAW, TYPE : Any>(delegate: FunctionalProperty<RAW, TYPE>) : RequiredDelegatedProperty<List<TYPE>, FunctionalProperty<RAW, TYPE>>(delegate) {
|
||||
private class FunctionalListProperty<RAW, TYPE>(delegate: FunctionalProperty<RAW, TYPE>) : RequiredDelegatedProperty<List<TYPE>, FunctionalProperty<RAW, TYPE>>(delegate), Configuration.Property.Definition.RequiredList<TYPE> {
|
||||
|
||||
override val typeName: String = "List<${super.typeName}>"
|
||||
|
||||
@ -167,13 +165,15 @@ private class FunctionalListProperty<RAW, TYPE : Any>(delegate: FunctionalProper
|
||||
}
|
||||
}
|
||||
|
||||
override fun describe(configuration: Config, serialiseValue: (Any) -> ConfigValue): ConfigValue {
|
||||
override fun describe(configuration: Config, serialiseValue: (Any?) -> ConfigValue): ConfigValue {
|
||||
|
||||
if (isSensitive) {
|
||||
return valueDescription(Configuration.Property.Definition.SENSITIVE_DATA_PLACEHOLDER, serialiseValue)
|
||||
}
|
||||
return delegate.schema?.let { schema -> valueDescription(valueIn(configuration).asSequence().map { element -> valueDescription(element, serialiseValue) }.map { it as ConfigObject }.map(ConfigObject::toConfig).map { schema.describe(it, serialiseValue) }.toList(), serialiseValue) } ?: valueDescription(valueIn(configuration), serialiseValue)
|
||||
}
|
||||
|
||||
override fun <MAPPED> mapValid(mappedTypeName: String, convert: (List<TYPE>) -> Validated<MAPPED, Configuration.Validation.Error>): Configuration.Property.Definition.Required<MAPPED> = ListMappingProperty(this, mappedTypeName, convert)
|
||||
}
|
||||
|
||||
private abstract class DelegatedProperty<TYPE, DELEGATE : Configuration.Property.Metadata>(protected val delegate: DELEGATE) : Configuration.Property.Metadata by delegate, Configuration.Property.Definition<TYPE> {
|
||||
@ -181,13 +181,13 @@ private abstract class DelegatedProperty<TYPE, DELEGATE : Configuration.Property
|
||||
final override fun toString() = "\"$key\": \"$typeName\""
|
||||
}
|
||||
|
||||
private class OptionalDelegatedProperty<TYPE : Any>(private val delegate: Configuration.Property.Definition<TYPE>) : Configuration.Property.Metadata by delegate, Configuration.Property.Definition.Optional<TYPE> {
|
||||
private class OptionalDelegatedProperty<TYPE>(private val delegate: Configuration.Property.Definition<TYPE>) : Configuration.Property.Metadata by delegate, Configuration.Property.Definition.Optional<TYPE> {
|
||||
|
||||
override val isMandatory: Boolean = false
|
||||
|
||||
override val typeName: String = "${delegate.typeName}?"
|
||||
|
||||
override fun describe(configuration: Config, serialiseValue: (Any) -> ConfigValue) = if (isSpecifiedBy(configuration)) delegate.describe(configuration, serialiseValue) else null
|
||||
override fun describe(configuration: Config, serialiseValue: (Any?) -> ConfigValue) = if (isSpecifiedBy(configuration)) delegate.describe(configuration, serialiseValue) else null
|
||||
|
||||
override fun valueIn(configuration: Config): TYPE? {
|
||||
|
||||
@ -214,11 +214,39 @@ private class OptionalDelegatedProperty<TYPE : Any>(private val delegate: Config
|
||||
}
|
||||
|
||||
|
||||
private abstract class RequiredDelegatedProperty<TYPE : Any, DELEGATE : Configuration.Property.Definition.Required<*>>(delegate: DELEGATE) : DelegatedProperty<TYPE, DELEGATE>(delegate), Configuration.Property.Definition.Required<TYPE> {
|
||||
private abstract class RequiredDelegatedProperty<TYPE, DELEGATE : Configuration.Property.Definition.Required<*>>(delegate: DELEGATE) : DelegatedProperty<TYPE, DELEGATE>(delegate), Configuration.Property.Definition.Required<TYPE> {
|
||||
|
||||
final override fun optional(): Configuration.Property.Definition.Optional<TYPE> = OptionalDelegatedProperty(this)
|
||||
}
|
||||
|
||||
private class ListMappingProperty<TYPE, MAPPED>(private val delegate: Configuration.Property.Definition.RequiredList<TYPE>, private val mappedTypeName: String, private val convert: (List<TYPE>) -> Validated<MAPPED, Configuration.Validation.Error>) : Configuration.Property.Definition.Required<MAPPED> {
|
||||
|
||||
override fun describe(configuration: Config, serialiseValue: (Any?) -> ConfigValue): ConfigValue? = delegate.describe(configuration, serialiseValue)
|
||||
|
||||
override fun valueIn(configuration: Config) = convert.invoke(delegate.valueIn(configuration)).value()
|
||||
|
||||
override fun optional(): Configuration.Property.Definition.Optional<MAPPED> = OptionalDelegatedProperty(this)
|
||||
|
||||
override fun validate(target: Config, options: Configuration.Validation.Options): Validated<Config, Configuration.Validation.Error> {
|
||||
|
||||
val errors = mutableSetOf<Configuration.Validation.Error>()
|
||||
errors += delegate.validate(target, options).errors
|
||||
if (errors.isEmpty()) {
|
||||
errors += convert.invoke(delegate.valueIn(target)).mapErrors { error -> error.with(delegate.key, mappedTypeName) }.errors
|
||||
}
|
||||
return Validated.withResult(target, errors)
|
||||
}
|
||||
|
||||
override val typeName: String = mappedTypeName
|
||||
|
||||
override val key = delegate.key
|
||||
override val isMandatory = delegate.isMandatory
|
||||
override val isSensitive = delegate.isSensitive
|
||||
override val schema = delegate.schema
|
||||
|
||||
override fun toString() = "\"$key\": \"$typeName\""
|
||||
}
|
||||
|
||||
fun ConfigException.toValidationError(keyName: String? = null, typeName: String): Configuration.Validation.Error {
|
||||
|
||||
val toError = when (this) {
|
||||
@ -249,4 +277,4 @@ private val expectedExceptionTypes = setOf(ConfigException.Missing::class, Confi
|
||||
|
||||
private fun isErrorExpected(error: ConfigException) = expectedExceptionTypes.any { expected -> expected.isInstance(error) }
|
||||
|
||||
private fun valueDescription(value: Any, serialiseValue: (Any) -> ConfigValue) = serialiseValue.invoke(value)
|
||||
private fun valueDescription(value: Any?, serialiseValue: (Any?) -> ConfigValue) = serialiseValue.invoke(value)
|
@ -47,7 +47,7 @@ internal class Schema(override val name: String?, unorderedProperties: Iterable<
|
||||
return description.toString()
|
||||
}
|
||||
|
||||
override fun describe(configuration: Config, serialiseValue: (Any) -> ConfigValue): ConfigValue {
|
||||
override fun describe(configuration: Config, serialiseValue: (Any?) -> ConfigValue): ConfigValue {
|
||||
|
||||
return properties.asSequence().map { it.key to it.describe(configuration, serialiseValue) }.filter { it.second != null }.fold(configObject()) { config, (key, value) -> config.withValue(key, value) }
|
||||
}
|
||||
|
@ -17,11 +17,20 @@ interface PropertyDelegate<TYPE> {
|
||||
fun optional(): PropertyDelegate.Optional<TYPE>
|
||||
}
|
||||
|
||||
interface RequiredList<TYPE>: Required<List<TYPE>> {
|
||||
|
||||
override operator fun provideDelegate(thisRef: Any?, property: KProperty<*>): ReadOnlyProperty<Any?, Configuration.Property.Definition.RequiredList<TYPE>>
|
||||
|
||||
fun <MAPPED> mapValid(mappedTypeName: String, convert: (List<TYPE>) -> Valid<MAPPED>): Required<MAPPED>
|
||||
|
||||
fun <MAPPED> map(mappedTypeName: String, convert: (List<TYPE>) -> MAPPED): Required<MAPPED> = mapValid(mappedTypeName) { value -> valid(convert.invoke(value)) }
|
||||
}
|
||||
|
||||
interface Single<TYPE> {
|
||||
|
||||
operator fun provideDelegate(thisRef: Any?, property: KProperty<*>): ReadOnlyProperty<Any?, Configuration.Property.Definition.Single<TYPE>>
|
||||
|
||||
fun list(): Required<List<TYPE>>
|
||||
fun list(): RequiredList<TYPE>
|
||||
}
|
||||
|
||||
interface Optional<TYPE> {
|
||||
@ -35,9 +44,9 @@ interface PropertyDelegate<TYPE> {
|
||||
|
||||
override operator fun provideDelegate(thisRef: Any?, property: KProperty<*>): ReadOnlyProperty<Any?, Configuration.Property.Definition.Standard<TYPE>>
|
||||
|
||||
fun <MAPPED : Any> mapValid(mappedTypeName: String, convert: (TYPE) -> Valid<MAPPED>): Standard<MAPPED>
|
||||
fun <MAPPED> mapValid(mappedTypeName: String, convert: (TYPE) -> Valid<MAPPED>): Standard<MAPPED>
|
||||
|
||||
fun <MAPPED : Any> map(mappedTypeName: String, convert: (TYPE) -> MAPPED): Standard<MAPPED> = mapValid(mappedTypeName) { value -> valid(convert.invoke(value)) }
|
||||
fun <MAPPED> map(mappedTypeName: String, convert: (TYPE) -> MAPPED): Standard<MAPPED> = mapValid(mappedTypeName) { value -> valid(convert.invoke(value)) }
|
||||
}
|
||||
|
||||
companion object {
|
||||
@ -74,11 +83,26 @@ private class PropertyDelegateImpl<TYPE>(private val key: String?, private val p
|
||||
}
|
||||
}
|
||||
|
||||
override fun list(): PropertyDelegate.Required<List<TYPE>> = ListPropertyDelegateImpl(key, prefix, sensitive, addToProperties, { k, s -> construct.invoke(k, s).list() })
|
||||
override fun list(): PropertyDelegate.RequiredList<TYPE> = ListPropertyDelegateImpl(key, prefix, sensitive, addToProperties, { k, s -> construct.invoke(k, s).list() })
|
||||
|
||||
override fun optional(): PropertyDelegate.Optional<TYPE> = OptionalPropertyDelegateImpl(key, prefix, sensitive, addToProperties, { k, s -> construct.invoke(k, s).optional() })
|
||||
|
||||
override fun <MAPPED : Any> mapValid(mappedTypeName: String, convert: (TYPE) -> Valid<MAPPED>): PropertyDelegate.Standard<MAPPED> = PropertyDelegateImpl(key, prefix, sensitive, addToProperties, { k, s -> construct.invoke(k, s).mapValid(mappedTypeName) { value -> convert.invoke(value) } })
|
||||
override fun <MAPPED> mapValid(mappedTypeName: String, convert: (TYPE) -> Valid<MAPPED>): PropertyDelegate.Standard<MAPPED> = PropertyDelegateImpl(key, prefix, sensitive, addToProperties, { k, s -> construct.invoke(k, s).mapValid(mappedTypeName) { value -> convert.invoke(value) } })
|
||||
}
|
||||
|
||||
private class RequiredPropertyDelegateImpl<TYPE>(private val key: String?, private val prefix: String?, private val sensitive: Boolean = false, private val addToProperties: (Configuration.Property.Definition<*>) -> Unit, private val construct: (String, Boolean) -> Configuration.Property.Definition.Required<TYPE>) : PropertyDelegate.Required<TYPE> {
|
||||
|
||||
override operator fun provideDelegate(thisRef: Any?, property: KProperty<*>): ReadOnlyProperty<Any?, Configuration.Property.Definition.Required<TYPE>> {
|
||||
|
||||
val shortName = key ?: property.name
|
||||
val prop = construct.invoke(prefix?.let { "$prefix.$shortName" } ?: shortName, sensitive).also(addToProperties)
|
||||
return object : ReadOnlyProperty<Any?, Configuration.Property.Definition.Required<TYPE>> {
|
||||
|
||||
override fun getValue(thisRef: Any?, property: KProperty<*>): Configuration.Property.Definition.Required<TYPE> = prop
|
||||
}
|
||||
}
|
||||
|
||||
override fun optional(): PropertyDelegate.Optional<TYPE> = OptionalPropertyDelegateImpl(key, prefix, sensitive, addToProperties, { k, s -> construct.invoke(k, s).optional() })
|
||||
}
|
||||
|
||||
private class OptionalPropertyDelegateImpl<TYPE>(private val key: String?, private val prefix: String?, private val sensitive: Boolean = false, private val addToProperties: (Configuration.Property.Definition<*>) -> Unit, private val construct: (String, Boolean) -> Configuration.Property.Definition.Optional<TYPE>) : PropertyDelegate.Optional<TYPE> {
|
||||
@ -109,17 +133,19 @@ private class OptionalWithDefaultPropertyDelegateImpl<TYPE>(private val key: Str
|
||||
}
|
||||
}
|
||||
|
||||
private class ListPropertyDelegateImpl<TYPE>(private val key: String?, private val prefix: String?, private val sensitive: Boolean = false, private val addToProperties: (Configuration.Property.Definition<*>) -> Unit, private val construct: (String, Boolean) -> Configuration.Property.Definition.Required<TYPE>) : PropertyDelegate.Required<TYPE> {
|
||||
private class ListPropertyDelegateImpl<TYPE>(private val key: String?, private val prefix: String?, private val sensitive: Boolean = false, private val addToProperties: (Configuration.Property.Definition<*>) -> Unit, private val construct: (String, Boolean) -> Configuration.Property.Definition.RequiredList<TYPE>) : PropertyDelegate.RequiredList<TYPE> {
|
||||
|
||||
override operator fun provideDelegate(thisRef: Any?, property: KProperty<*>): ReadOnlyProperty<Any?, Configuration.Property.Definition.Required<TYPE>> {
|
||||
override operator fun provideDelegate(thisRef: Any?, property: KProperty<*>): ReadOnlyProperty<Any?, Configuration.Property.Definition.RequiredList<TYPE>> {
|
||||
|
||||
val shortName = key ?: property.name
|
||||
val prop = construct.invoke(prefix?.let { "$prefix.$shortName" } ?: shortName, sensitive).also(addToProperties)
|
||||
return object : ReadOnlyProperty<Any?, Configuration.Property.Definition.Required<TYPE>> {
|
||||
return object : ReadOnlyProperty<Any?, Configuration.Property.Definition.RequiredList<TYPE>> {
|
||||
|
||||
override fun getValue(thisRef: Any?, property: KProperty<*>): Configuration.Property.Definition.Required<TYPE> = prop
|
||||
override fun getValue(thisRef: Any?, property: KProperty<*>): Configuration.Property.Definition.RequiredList<TYPE> = prop
|
||||
}
|
||||
}
|
||||
|
||||
override fun optional(): PropertyDelegate.Optional<TYPE> = OptionalPropertyDelegateImpl(key, prefix, sensitive, addToProperties, { k, s -> construct.invoke(k, s).optional() })
|
||||
override fun optional(): PropertyDelegate.Optional<List<TYPE>> = OptionalPropertyDelegateImpl(key, prefix, sensitive, addToProperties, { k, s -> construct.invoke(k, s).optional() })
|
||||
|
||||
override fun <MAPPED> mapValid(mappedTypeName: String, convert: (List<TYPE>) -> Valid<MAPPED>): PropertyDelegate.Required<MAPPED> = RequiredPropertyDelegateImpl(key, prefix, sensitive, addToProperties, { k, s -> construct.invoke(k, s).mapValid(mappedTypeName) { value -> convert.invoke(value) } })
|
||||
}
|
@ -3,13 +3,23 @@ package net.corda.common.configuration.parsing.internal
|
||||
import com.typesafe.config.*
|
||||
import net.corda.common.validation.internal.Validated
|
||||
|
||||
inline fun <TYPE, reified MAPPED : Any> Configuration.Property.Definition.Standard<TYPE>.mapValid(noinline convert: (TYPE) -> Valid<MAPPED>): Configuration.Property.Definition.Standard<MAPPED> = mapValid(MAPPED::class.java.simpleName, convert)
|
||||
|
||||
inline fun <reified ENUM : Enum<ENUM>, VALUE : Any> Configuration.Specification<VALUE>.enum(key: String? = null, sensitive: Boolean = false): PropertyDelegate.Standard<ENUM> = enum(key, ENUM::class, sensitive)
|
||||
|
||||
inline fun <TYPE, reified MAPPED : Any> PropertyDelegate.Standard<TYPE>.mapValid(noinline convert: (TYPE) -> Valid<MAPPED>): PropertyDelegate.Standard<MAPPED> = mapValid(MAPPED::class.java.simpleName, convert)
|
||||
inline fun <TYPE, reified MAPPED> PropertyDelegate.Standard<TYPE>.mapValid(noinline convert: (TYPE) -> Valid<MAPPED>): PropertyDelegate.Standard<MAPPED> = mapValid(MAPPED::class.java.simpleName, convert)
|
||||
|
||||
inline fun <TYPE, reified MAPPED : Any> PropertyDelegate.Standard<TYPE>.map(noinline convert: (TYPE) -> MAPPED): PropertyDelegate.Standard<MAPPED> = map(MAPPED::class.java.simpleName, convert)
|
||||
inline fun <TYPE, reified MAPPED> PropertyDelegate.Standard<TYPE>.map(noinline convert: (TYPE) -> MAPPED): PropertyDelegate.Standard<MAPPED> = map(MAPPED::class.java.simpleName, convert)
|
||||
|
||||
inline fun <TYPE, reified MAPPED> PropertyDelegate.RequiredList<TYPE>.mapValid(noinline convert: (List<TYPE>) -> Valid<MAPPED>): PropertyDelegate.Required<MAPPED> = mapValid(MAPPED::class.java.simpleName, convert)
|
||||
|
||||
inline fun <TYPE, reified MAPPED> PropertyDelegate.RequiredList<TYPE>.map(noinline convert: (List<TYPE>) -> MAPPED): PropertyDelegate.Required<MAPPED> = map(MAPPED::class.java.simpleName, convert)
|
||||
|
||||
inline fun <TYPE, reified MAPPED> Configuration.Property.Definition.Standard<TYPE>.mapValid(noinline convert: (TYPE) -> Valid<MAPPED>): Configuration.Property.Definition.Standard<MAPPED> = mapValid(MAPPED::class.java.simpleName, convert)
|
||||
|
||||
inline fun <TYPE, reified MAPPED> Configuration.Property.Definition.Standard<TYPE>.map(noinline convert: (TYPE) -> MAPPED): Configuration.Property.Definition.Standard<MAPPED> = map(MAPPED::class.java.simpleName, convert)
|
||||
|
||||
inline fun <TYPE, reified MAPPED> Configuration.Property.Definition.RequiredList<TYPE>.mapValid(noinline convert: (List<TYPE>) -> Valid<MAPPED>): Configuration.Property.Definition.Required<MAPPED> = mapValid(MAPPED::class.java.simpleName, convert)
|
||||
|
||||
inline fun <TYPE, reified MAPPED> Configuration.Property.Definition.RequiredList<TYPE>.map(noinline convert: (List<TYPE>) -> MAPPED): Configuration.Property.Definition.Required<MAPPED> = map(MAPPED::class.java.simpleName, convert)
|
||||
|
||||
operator fun <TYPE> Config.get(property: Configuration.Property.Definition<TYPE>): TYPE = property.valueIn(this)
|
||||
|
||||
|
@ -4,6 +4,7 @@ import com.typesafe.config.ConfigException
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
class PropertyTest {
|
||||
|
||||
@ -56,6 +57,68 @@ class PropertyTest {
|
||||
assertThat(property.valueIn(configuration)).isEqualTo(value)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun present_value_of_list_type_with_whole_list_mapping() {
|
||||
|
||||
val key = "a.b.c"
|
||||
val value = listOf(1L, 3L, 2L)
|
||||
val configuration = configObject(key to value).toConfig()
|
||||
|
||||
val property = Configuration.Property.Definition.long(key).list().map { list -> list.max() }
|
||||
println(property)
|
||||
|
||||
assertThat(property.key).isEqualTo(key)
|
||||
assertThat(property.isMandatory).isTrue()
|
||||
assertThat(property.isSpecifiedBy(configuration)).isTrue()
|
||||
assertThat(property.valueIn(configuration)).isEqualTo(value.max())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun absent_value_of_list_type_with_whole_list_mapping() {
|
||||
|
||||
val key = "a.b.c"
|
||||
val configuration = configObject().toConfig()
|
||||
|
||||
val property = Configuration.Property.Definition.long(key).list().map { list -> list.max() }.optional()
|
||||
println(property)
|
||||
|
||||
assertThat(property.key).isEqualTo(key)
|
||||
assertThat(property.isMandatory).isFalse()
|
||||
assertThat(property.isSpecifiedBy(configuration)).isFalse()
|
||||
assertThat(property.valueIn(configuration)).isEqualTo(null)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun present_value_of_list_type_with_single_element_and_whole_list_mapping() {
|
||||
|
||||
val key = "a.b.c"
|
||||
val value = listOf(1L, 3L, 2L)
|
||||
val configuration = configObject(key to value).toConfig()
|
||||
|
||||
val property = Configuration.Property.Definition.long(key).map(::AtomicLong).list().map { list -> list.map(AtomicLong::get).max() }
|
||||
println(property)
|
||||
|
||||
assertThat(property.key).isEqualTo(key)
|
||||
assertThat(property.isMandatory).isTrue()
|
||||
assertThat(property.isSpecifiedBy(configuration)).isTrue()
|
||||
assertThat(property.valueIn(configuration)).isEqualTo(value.max())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun absent_value_of_list_type_with_single_element_and_whole_list_mapping() {
|
||||
|
||||
val key = "a.b.c"
|
||||
val configuration = configObject().toConfig()
|
||||
|
||||
val property = Configuration.Property.Definition.long(key).map(::AtomicLong).list().map { list -> list.map(AtomicLong::get).max() }.optional()
|
||||
println(property)
|
||||
|
||||
assertThat(property.key).isEqualTo(key)
|
||||
assertThat(property.isMandatory).isFalse()
|
||||
assertThat(property.isSpecifiedBy(configuration)).isFalse()
|
||||
assertThat(property.valueIn(configuration)).isEqualTo(null)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun optional_present_value_of_list_type() {
|
||||
|
||||
|
@ -83,6 +83,48 @@ class PropertyValidationTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun whole_list_validation_valid_value() {
|
||||
|
||||
val key = "a.b.c"
|
||||
val value = listOf(1L, 2L, 3L)
|
||||
val configuration = configObject(key to value).toConfig()
|
||||
|
||||
fun parseMax(list: List<Long>): Valid<Long?> = valid(list.max())
|
||||
|
||||
val property = Configuration.Property.Definition.long(key).list().mapValid(::parseMax)
|
||||
|
||||
assertThat(property.validate(configuration).errors).isEmpty()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun whole_list_validation_invalid_value() {
|
||||
|
||||
val key = "a.b.c"
|
||||
val value = listOf(1L, 2L, 3L)
|
||||
val configuration = configObject(key to value).toConfig()
|
||||
|
||||
fun parseMax(list: List<Long>): Valid<Long?> {
|
||||
|
||||
if (list.any { value -> value <= 1L }) {
|
||||
return invalid(Configuration.Validation.Error.BadValue.of("All values must be greater than 1"))
|
||||
}
|
||||
return valid(list.max())
|
||||
}
|
||||
|
||||
val property = Configuration.Property.Definition.long(key).list().mapValid(::parseMax)
|
||||
|
||||
assertThat(property.validate(configuration).errors).satisfies { errors ->
|
||||
|
||||
assertThat(errors).hasSize(1)
|
||||
assertThat(errors.first()).isInstanceOfSatisfying(Configuration.Validation.Error.BadValue::class.java) { error ->
|
||||
|
||||
assertThat(error.keyName).isEqualTo(key.split(".").last())
|
||||
assertThat(error.path).containsExactly(*key.split(".").toTypedArray())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun wrong_type() {
|
||||
|
||||
|
@ -1,8 +1,11 @@
|
||||
package net.corda.common.configuration.parsing.internal
|
||||
|
||||
import com.typesafe.config.Config
|
||||
import net.corda.common.validation.internal.Validated
|
||||
import net.corda.common.validation.internal.Validated.Companion.invalid
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
class SpecificationTest {
|
||||
|
||||
@ -50,6 +53,28 @@ class SpecificationTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun parse_list_aggregation() {
|
||||
|
||||
val spec = object : Configuration.Specification<AtomicLong>("AtomicLong") {
|
||||
|
||||
private val maxElement by long("elements").list().map { elements -> elements.max() }
|
||||
|
||||
override fun parseValid(configuration: Config): Valid<AtomicLong> {
|
||||
|
||||
return valid(AtomicLong(configuration[maxElement]!!))
|
||||
}
|
||||
}
|
||||
|
||||
val elements = listOf(1L, 10L, 2L)
|
||||
val configuration = configObject("elements" to elements).toConfig()
|
||||
|
||||
val result = spec.parse(configuration)
|
||||
|
||||
assertThat(result.isValid).isTrue()
|
||||
assertThat(result.value().get()).isEqualTo(elements.max())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun validate() {
|
||||
|
||||
@ -68,6 +93,43 @@ class SpecificationTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun validate_list_aggregation() {
|
||||
|
||||
fun parseMax(elements: List<Long>): Valid<Long> {
|
||||
|
||||
if (elements.isEmpty()) {
|
||||
return invalid(Configuration.Validation.Error.BadValue.of("element list cannot be empty"))
|
||||
}
|
||||
if (elements.any { element -> element <= 1 }) {
|
||||
return invalid(Configuration.Validation.Error.BadValue.of("elements cannot be less than or equal to 1"))
|
||||
}
|
||||
return valid(elements.max()!!)
|
||||
}
|
||||
|
||||
val spec = object : Configuration.Specification<AtomicLong>("AtomicLong") {
|
||||
|
||||
private val maxElement by long("elements").list().mapValid(::parseMax)
|
||||
|
||||
override fun parseValid(configuration: Config): Valid<AtomicLong> {
|
||||
|
||||
return valid(AtomicLong(configuration[maxElement]))
|
||||
}
|
||||
}
|
||||
|
||||
val elements = listOf(1L, 10L, 2L)
|
||||
val configuration = configObject("elements" to elements).toConfig()
|
||||
|
||||
val result = spec.parse(configuration)
|
||||
|
||||
assertThat(result.errors).hasSize(1)
|
||||
assertThat(result.errors.first()).isInstanceOfSatisfying(Configuration.Validation.Error.BadValue::class.java) { error ->
|
||||
|
||||
assertThat(error.path).containsExactly("elements")
|
||||
assertThat(error.message).contains("elements cannot be less than or equal to 1")
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun validate_with_domain_specific_errors() {
|
||||
|
||||
|
@ -7,12 +7,13 @@ import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.BackpressureAwareTimedFlow
|
||||
import net.corda.core.internal.FetchDataFlow
|
||||
import net.corda.core.internal.TimedFlow
|
||||
import net.corda.core.internal.notary.generateSignature
|
||||
import net.corda.core.internal.notary.validateSignatures
|
||||
import net.corda.core.internal.pushToLoggingContext
|
||||
import net.corda.core.transactions.ContractUpgradeWireTransaction
|
||||
import net.corda.core.transactions.ReferenceStateRef
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
@ -36,7 +37,7 @@ class NotaryFlow {
|
||||
open class Client(
|
||||
private val stx: SignedTransaction,
|
||||
override val progressTracker: ProgressTracker
|
||||
) : FlowLogic<List<TransactionSignature>>(), TimedFlow {
|
||||
) : BackpressureAwareTimedFlow<List<TransactionSignature>>() {
|
||||
constructor(stx: SignedTransaction) : this(stx, tracker())
|
||||
|
||||
companion object {
|
||||
@ -90,7 +91,7 @@ class NotaryFlow {
|
||||
private fun sendAndReceiveValidating(session: FlowSession, signature: NotarisationRequestSignature): UntrustworthyData<NotarisationResponse> {
|
||||
val payload = NotarisationPayload(stx, signature)
|
||||
subFlow(NotarySendTransactionFlow(session, payload))
|
||||
return session.receive()
|
||||
return receiveResultOrTiming(session)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@ -98,10 +99,11 @@ class NotaryFlow {
|
||||
val ctx = stx.coreTransaction
|
||||
val tx = when (ctx) {
|
||||
is ContractUpgradeWireTransaction -> ctx.buildFilteredTransaction()
|
||||
is WireTransaction -> ctx.buildFilteredTransaction(Predicate { it is StateRef || it is TimeWindow || it == notaryParty })
|
||||
is WireTransaction -> ctx.buildFilteredTransaction(Predicate { it is StateRef || it is ReferenceStateRef || it is TimeWindow || it == notaryParty })
|
||||
else -> ctx
|
||||
}
|
||||
return session.sendAndReceiveWithRetry(NotarisationPayload(tx, signature))
|
||||
session.send(NotarisationPayload(tx, signature))
|
||||
return receiveResultOrTiming(session)
|
||||
}
|
||||
|
||||
/** Checks that the notary's signature(s) is/are valid. */
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
* A notarisation request specifies a list of states to consume and the id of the consuming transaction. Its primary
|
||||
@ -80,4 +81,8 @@ data class NotarisationPayload(val transaction: Any, val requestSignature: Notar
|
||||
|
||||
/** Payload returned by the notary service flow to the client. */
|
||||
@CordaSerializable
|
||||
data class NotarisationResponse(val signatures: List<TransactionSignature>)
|
||||
data class NotarisationResponse(val signatures: List<TransactionSignature>)
|
||||
|
||||
/** Sent by the notary when the notary detects it will unlikely respond before the client retries. */
|
||||
@CordaSerializable
|
||||
data class WaitTimeUpdate(val waitTime: Duration)
|
||||
|
@ -0,0 +1,36 @@
|
||||
package net.corda.core.internal
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.WaitTimeUpdate
|
||||
import net.corda.core.utilities.UntrustworthyData
|
||||
|
||||
const val MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE = 4
|
||||
|
||||
/**
|
||||
* Implementation of TimedFlow that can handle WaitTimeUpdate messages. Any flow talking to the notary should implement this and use
|
||||
* explicit send and this class's receiveResultOrTiming to receive the response to handle cases where the notary sends a timeout update.
|
||||
*
|
||||
* This is handling the special case of the notary where the notary service will have an internal queue on the uniqueness provider and we
|
||||
* want to stop retries overwhelming that internal queue. As the TimedFlow mechanism and the notary service back-pressure are very specific
|
||||
* to this use case at the moment, this implementation is internal and not for general use.
|
||||
*/
|
||||
abstract class BackpressureAwareTimedFlow<ResultType> : FlowLogic<ResultType>(), TimedFlow {
|
||||
@Suspendable
|
||||
inline fun <reified ReceiveType> receiveResultOrTiming(session: FlowSession): UntrustworthyData<ReceiveType> {
|
||||
while (true) {
|
||||
val wrappedResult = session.receive<Any>()
|
||||
val unwrapped = wrappedResult.fromUntrustedWorld
|
||||
when {
|
||||
unwrapped is WaitTimeUpdate -> {
|
||||
logger.info("Counterparty [${session.counterparty}] is busy - TimedFlow $runId has been asked to wait for an additional ${unwrapped.waitTime} seconds for completion.")
|
||||
stateMachine.updateTimedFlowTimeout(unwrapped.waitTime.seconds)
|
||||
}
|
||||
unwrapped is ReceiveType -> @Suppress("UNCHECKED_CAST") // The compiler doesn't understand it's checked in the line above
|
||||
return wrappedResult as UntrustworthyData<ReceiveType>
|
||||
else -> throw throw IllegalArgumentException("We were expecting a ${ReceiveType::class.java.name} or WaitTimeUpdate but we instead got a ${unwrapped.javaClass.name} ($unwrapped)")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -15,7 +15,7 @@ import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import org.slf4j.MDC
|
||||
|
||||
// *Internal* Corda-specific utilities
|
||||
// *Internal* Corda-specific utilities.
|
||||
|
||||
const val PLATFORM_VERSION = 4
|
||||
|
||||
@ -33,13 +33,13 @@ fun checkMinimumPlatformVersion(minimumPlatformVersion: Int, requiredMinPlatform
|
||||
}
|
||||
}
|
||||
|
||||
/** Provide access to internal method for AttachmentClassLoaderTests */
|
||||
/** Provide access to internal method for AttachmentClassLoaderTests. */
|
||||
@DeleteForDJVM
|
||||
fun TransactionBuilder.toWireTransaction(services: ServicesForResolution, serializationContext: SerializationContext): WireTransaction {
|
||||
return toWireTransactionWithContext(services, serializationContext)
|
||||
}
|
||||
|
||||
/** Provide access to internal method for AttachmentClassLoaderTests */
|
||||
/** Provide access to internal method for AttachmentClassLoaderTests. */
|
||||
@DeleteForDJVM
|
||||
fun TransactionBuilder.toLedgerTransaction(services: ServicesForResolution, serializationContext: SerializationContext): LedgerTransaction {
|
||||
return toLedgerTransactionWithContext(services, serializationContext)
|
||||
|
@ -36,6 +36,8 @@ interface FlowStateMachine<FLOWRETURN> {
|
||||
@Suspendable
|
||||
fun persistFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>)
|
||||
|
||||
fun updateTimedFlowTimeout(timeoutSeconds: Long)
|
||||
|
||||
val logic: FlowLogic<FLOWRETURN>
|
||||
val serviceHub: ServiceHub
|
||||
val logger: Logger
|
||||
|
@ -4,10 +4,23 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.NotarisationPayload
|
||||
import net.corda.core.flows.NotarisationRequest
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotarisationResponse
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryException
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.flows.WaitTimeUpdate
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.IdempotentFlow
|
||||
import net.corda.core.internal.MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.utilities.unwrap
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
* A flow run by a notary service that handles notarisation requests.
|
||||
@ -16,16 +29,30 @@ import net.corda.core.utilities.unwrap
|
||||
* if any of the input states have been previously committed.
|
||||
*
|
||||
* Additional transaction validation logic can be added when implementing [validateRequest].
|
||||
*
|
||||
* @param otherSideSession The session with the notary client.
|
||||
* @param service The notary service to utilise.
|
||||
* @param etaThreshold If the ETA for processing the request, according to the service, is greater than this, notify the client.
|
||||
*/
|
||||
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
|
||||
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic<Void?>(), IdempotentFlow {
|
||||
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService, private val etaThreshold: Duration) : FlowLogic<Void?>(), IdempotentFlow {
|
||||
companion object {
|
||||
// TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder.
|
||||
private const val maxAllowedInputsAndReferences = 10_000
|
||||
|
||||
/**
|
||||
* This is default wait time estimate for notaries/uniqueness providers that do not estimate wait times.
|
||||
* Also used as default eta message threshold so that a default wait time/default threshold will never
|
||||
* lead to an update message being sent.
|
||||
*/
|
||||
val defaultEstimatedWaitTime: Duration = 10.seconds
|
||||
}
|
||||
|
||||
private var transactionId: SecureHash? = null
|
||||
|
||||
@Suspendable
|
||||
private fun counterpartyCanHandleBackPressure() = otherSideSession.getCounterpartyFlowInfo(true).flowVersion >= MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Void? {
|
||||
check(serviceHub.myInfo.legalIdentities.any { serviceHub.networkMapCache.isNotary(it) }) {
|
||||
@ -40,6 +67,11 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
|
||||
|
||||
verifyTransaction(requestPayload)
|
||||
|
||||
val eta = service.getEstimatedWaitTime(tx.inputs.size + tx.references.size)
|
||||
if (eta > etaThreshold && counterpartyCanHandleBackPressure()) {
|
||||
otherSideSession.send(WaitTimeUpdate(eta))
|
||||
}
|
||||
|
||||
service.commitInputStates(
|
||||
tx.inputs,
|
||||
tx.id,
|
||||
|
@ -4,7 +4,11 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignableData
|
||||
import net.corda.core.crypto.SignatureMetadata
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.identity.Party
|
||||
@ -14,6 +18,7 @@ import net.corda.core.internal.notary.UniquenessProvider.Result
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.slf4j.Logger
|
||||
import java.time.Duration
|
||||
|
||||
/** Base implementation for a notary service operated by a singe party. */
|
||||
abstract class SinglePartyNotaryService : NotaryService() {
|
||||
@ -42,6 +47,7 @@ abstract class SinglePartyNotaryService : NotaryService() {
|
||||
|
||||
val callingFlow = FlowLogic.currentTopLevel
|
||||
?: throw IllegalStateException("This method should be invoked in a flow context.")
|
||||
|
||||
val result = callingFlow.executeAsync(
|
||||
CommitOperation(
|
||||
this,
|
||||
@ -59,6 +65,13 @@ abstract class SinglePartyNotaryService : NotaryService() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Estimate the wait time to be notarised taking into account the new request size.
|
||||
*
|
||||
* @param numStates The number of states we're about to request be notarised.
|
||||
*/
|
||||
fun getEstimatedWaitTime(numStates: Int): Duration = uniquenessProvider.getEta(numStates)
|
||||
|
||||
/**
|
||||
* Required for the flow to be able to suspend until the commit is complete.
|
||||
* This object will be included in the flow checkpoint.
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.Party
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
* A service that records input states of the given transaction and provides conflict information
|
||||
@ -23,6 +24,18 @@ interface UniquenessProvider {
|
||||
references: List<StateRef> = emptyList()
|
||||
): CordaFuture<Result>
|
||||
|
||||
/**
|
||||
* Estimated time of request processing. A uniqueness provider that is aware of their own throughput can return
|
||||
* an estimate how long requests will be queued before they can be processed. Notary services use this information
|
||||
* to potentially update clients with an expected wait time in order to avoid spamming by retries when the notary
|
||||
* gets busy.
|
||||
*
|
||||
* @param numStates The number of states (input + reference) in the new request, to be added to the pending count.
|
||||
*/
|
||||
fun getEta(numStates: Int): Duration {
|
||||
return NotaryServiceFlow.defaultEstimatedWaitTime
|
||||
}
|
||||
|
||||
/** The outcome of committing a transaction. */
|
||||
sealed class Result {
|
||||
/** Indicates that all input states have been committed successfully. */
|
||||
@ -30,4 +43,4 @@ interface UniquenessProvider {
|
||||
/** Indicates that the transaction has not been committed. */
|
||||
data class Failure(val error: NotaryError) : Result()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -147,7 +147,9 @@ class FilteredTransaction internal constructor(
|
||||
wtx.attachments.forEachIndexed { internalIndex, it -> filter(it, ATTACHMENTS_GROUP.ordinal, internalIndex) }
|
||||
if (wtx.notary != null) filter(wtx.notary, NOTARY_GROUP.ordinal, 0)
|
||||
if (wtx.timeWindow != null) filter(wtx.timeWindow, TIMEWINDOW_GROUP.ordinal, 0)
|
||||
wtx.references.forEachIndexed { internalIndex, it -> filter(it, REFERENCES_GROUP.ordinal, internalIndex) }
|
||||
// Note that because [inputs] and [references] share the same type [StateRef], we use a wrapper for references [ReferenceStateRef],
|
||||
// when filtering. Thus, to filter-in all [references] based on type, one should use the wrapper type [ReferenceStateRef] and not [StateRef].
|
||||
wtx.references.forEachIndexed { internalIndex, it -> filter(ReferenceStateRef(it), REFERENCES_GROUP.ordinal, internalIndex) }
|
||||
// It is highlighted that because there is no a signers property in TraversableTransaction,
|
||||
// one cannot specifically filter them in or out.
|
||||
// The above is very important to ensure someone won't filter out the signers component group if at least one
|
||||
@ -344,3 +346,8 @@ class ComponentVisibilityException(val id: SecureHash, val reason: String) : Cor
|
||||
@KeepForDJVM
|
||||
@CordaSerializable
|
||||
class FilteredTransactionVerificationException(val id: SecureHash, val reason: String) : CordaException("Transaction with id:$id cannot be verified. Reason: $reason")
|
||||
|
||||
/** Wrapper over [StateRef] to be used when filtering reference states. */
|
||||
@KeepForDJVM
|
||||
@CordaSerializable
|
||||
data class ReferenceStateRef(val stateRef: StateRef)
|
||||
|
@ -8,11 +8,13 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.ReferenceStateRef
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.`issued by`
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.node.services.api.IdentityServiceInternal
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
@ -59,9 +61,14 @@ class PartialMerkleTreeTest {
|
||||
hashed = nodes.map { it.serialize().sha256() }
|
||||
expectedRoot = MerkleTree.getMerkleTree(hashed.toMutableList() + listOf(zeroHash, zeroHash)).hash
|
||||
merkleTree = MerkleTree.getMerkleTree(hashed)
|
||||
testLedger = MockServices(emptyList(), MEGA_CORP.name, rigorousMock<IdentityServiceInternal>().also {
|
||||
doReturn(MEGA_CORP).whenever(it).partyFromKey(MEGA_CORP_PUBKEY)
|
||||
}).ledger(DUMMY_NOTARY) {
|
||||
|
||||
testLedger = MockServices(
|
||||
cordappPackages = emptyList(),
|
||||
initialIdentity = TestIdentity(MEGA_CORP.name),
|
||||
identityService = rigorousMock<IdentityServiceInternal>().also {
|
||||
doReturn(MEGA_CORP).whenever(it).partyFromKey(MEGA_CORP_PUBKEY) },
|
||||
networkParameters = testNetworkParameters(minimumPlatformVersion = 4)
|
||||
).ledger(DUMMY_NOTARY) {
|
||||
unverifiedTransaction {
|
||||
attachments(Cash.PROGRAM_ID)
|
||||
output(Cash.PROGRAM_ID, "MEGA_CORP cash",
|
||||
@ -76,6 +83,7 @@ class PartialMerkleTreeTest {
|
||||
transaction {
|
||||
attachments(Cash.PROGRAM_ID)
|
||||
input("MEGA_CORP cash")
|
||||
reference("dummy cash 1")
|
||||
output(Cash.PROGRAM_ID, "MEGA_CORP cash".output<Cash.State>().copy(owner = MINI_CORP))
|
||||
command(MEGA_CORP_PUBKEY, Cash.Commands.Move())
|
||||
timeWindow(TEST_TX_TIME)
|
||||
@ -148,6 +156,7 @@ class PartialMerkleTreeTest {
|
||||
// the signers component is also sent (required for visibility purposes).
|
||||
assertEquals(5, ftx.filteredComponentGroups.size)
|
||||
assertEquals(1, ftx.inputs.size)
|
||||
assertEquals(0, ftx.references.size)
|
||||
assertEquals(0, ftx.attachments.size)
|
||||
assertEquals(1, ftx.outputs.size)
|
||||
assertEquals(1, ftx.commands.size)
|
||||
@ -173,6 +182,7 @@ class PartialMerkleTreeTest {
|
||||
assertTrue(ftxNothing.attachments.isEmpty())
|
||||
assertTrue(ftxNothing.commands.isEmpty())
|
||||
assertTrue(ftxNothing.inputs.isEmpty())
|
||||
assertTrue(ftxNothing.references.isEmpty())
|
||||
assertTrue(ftxNothing.outputs.isEmpty())
|
||||
assertNull(ftxNothing.timeWindow)
|
||||
assertTrue(ftxNothing.availableComponentGroups.flatten().isEmpty())
|
||||
@ -321,4 +331,21 @@ class PartialMerkleTreeTest {
|
||||
// The provided hash is not in the tree (using a leaf that didn't exist in the original Merkle tree).
|
||||
assertFailsWith<MerkleTreeException> { pmtAllIncluded.leafIndex(SecureHash.sha256("30")) }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `building Merkle for reference states only`() {
|
||||
fun filtering(elem: Any): Boolean {
|
||||
return when (elem) {
|
||||
is ReferenceStateRef -> true
|
||||
else -> false
|
||||
}
|
||||
}
|
||||
|
||||
val ftx = testTx.buildFilteredTransaction(Predicate(::filtering))
|
||||
|
||||
assertEquals(1, ftx.filteredComponentGroups.size)
|
||||
assertEquals(0, ftx.inputs.size)
|
||||
assertEquals(1, ftx.references.size)
|
||||
ftx.verify()
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
val CONTRACT_ID = "net.corda.core.transactions.ReferenceStateTests\$ExampleContract"
|
||||
const val CONTRACT_ID = "net.corda.core.transactions.ReferenceStateTests\$ExampleContract"
|
||||
|
||||
class ReferenceStateTests {
|
||||
private companion object {
|
||||
|
@ -277,6 +277,11 @@ Unreleased
|
||||
normal state when it occurs in an input or output position. *This feature is only available on Corda networks running
|
||||
with a minimum platform version of 4.*
|
||||
|
||||
* A new wrapper class over ``StateRef`` is introduced, called ``ReferenceStateRef``. Although "reference input states" are stored as
|
||||
``StateRef`` objects in ``WireTransaction``, we needed a way to distinguish between "input states" and "reference input states" when
|
||||
required to filter by object type. Thus, when one wants to filter-in all "reference input states" in a ``FilteredTransaction``
|
||||
then he/she should check if it is of type ``ReferenceStateRef``.
|
||||
|
||||
* Removed type parameter `U` from `tryLockFungibleStatesForSpending` to allow the function to be used with `FungibleState`
|
||||
as well as `FungibleAsset`. This _might_ cause a compile failure in some obscure cases due to the removal of the type
|
||||
parameter from the method. If your CorDapp does specify types explicitly when using this method then updating the types
|
||||
|
@ -3,6 +3,7 @@ package net.corda.notary.raft
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.transactions.NonValidatingNotaryFlow
|
||||
import net.corda.node.services.transactions.ValidatingNotaryFlow
|
||||
@ -36,8 +37,8 @@ class RaftNotaryService(
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow {
|
||||
return if (notaryConfig.validating) {
|
||||
ValidatingNotaryFlow(otherPartySession, this)
|
||||
} else NonValidatingNotaryFlow(otherPartySession, this)
|
||||
ValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
|
||||
} else NonValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
|
@ -224,7 +224,7 @@ private fun <T : Enum<T>> enumBridge(clazz: Class<T>, name: String): T {
|
||||
*/
|
||||
fun Any.toConfig(): Config = ConfigValueFactory.fromMap(toConfigMap()).toConfig()
|
||||
|
||||
fun Any.toConfigValue(): ConfigValue = if (this is ConfigValue) this else ConfigValueFactory.fromAnyRef(convertValue(this))
|
||||
fun Any?.toConfigValue(): ConfigValue = if (this is ConfigValue) this else if (this != null) ConfigValueFactory.fromAnyRef(convertValue(this)) else ConfigValueFactory.fromAnyRef(null)
|
||||
|
||||
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
|
||||
// Reflect over the fields of the receiver and generate a value Map that can use to create Config object.
|
||||
|
@ -26,7 +26,7 @@ internal class ValidateConfigurationCli : CliWrapperBase("validate-configuration
|
||||
return "for path: \"$pathAsString\": $message"
|
||||
}
|
||||
|
||||
internal fun logRawConfig(config: Config) = logger.debug("Actual configuration:\n${V1NodeConfigurationSpec.describe(config, Any::toConfigValue).render(configRenderingOptions)}")
|
||||
internal fun logRawConfig(config: Config) = logger.debug("Actual configuration:\n${V1NodeConfigurationSpec.describe(config, Any?::toConfigValue).render(configRenderingOptions)}")
|
||||
}
|
||||
|
||||
@Mixin
|
||||
|
@ -6,6 +6,7 @@ import net.corda.common.validation.internal.Validated
|
||||
import net.corda.core.context.AuthServiceId
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.TimedFlow
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.config.rpc.NodeRpcOptions
|
||||
import net.corda.node.services.config.schema.v1.V1NodeConfigurationSpec
|
||||
@ -170,6 +171,12 @@ data class NotaryConfig(
|
||||
val serviceLegalName: CordaX500Name? = null,
|
||||
/** The name of the notary service class to load. */
|
||||
val className: String = "net.corda.node.services.transactions.SimpleNotaryService",
|
||||
/**
|
||||
* If the wait time estimate on the internal queue exceeds this value, the notary may send
|
||||
* a wait time update to the client (implementation specific and dependent on the counter
|
||||
* party version).
|
||||
*/
|
||||
val etaMessageThresholdSeconds: Int = NotaryServiceFlow.defaultEstimatedWaitTime.seconds.toInt(),
|
||||
/** Notary implementation-specific configuration parameters. */
|
||||
val extraConfig: Config? = null
|
||||
)
|
||||
|
@ -13,6 +13,7 @@ import net.corda.common.configuration.parsing.internal.nested
|
||||
import net.corda.common.validation.internal.Validated.Companion.invalid
|
||||
import net.corda.common.validation.internal.Validated.Companion.valid
|
||||
import net.corda.core.context.AuthServiceId
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.node.services.config.AuthDataSourceType
|
||||
import net.corda.node.services.config.CertChainPolicyConfig
|
||||
import net.corda.node.services.config.CertChainPolicyType
|
||||
@ -171,10 +172,11 @@ internal object NotaryConfigSpec : Configuration.Specification<NotaryConfig>("No
|
||||
private val validating by boolean()
|
||||
private val serviceLegalName by string().mapValid(::toCordaX500Name).optional()
|
||||
private val className by string().optional().withDefaultValue("net.corda.node.services.transactions.SimpleNotaryService")
|
||||
private val etaMessageThresholdSeconds by int().optional().withDefaultValue(NotaryServiceFlow.defaultEstimatedWaitTime.seconds.toInt())
|
||||
private val extraConfig by nestedObject().map(ConfigObject::toConfig).optional()
|
||||
|
||||
override fun parseValid(configuration: Config): Valid<NotaryConfig> {
|
||||
return valid(NotaryConfig(configuration[validating], configuration[serviceLegalName], configuration[className], configuration[extraConfig]))
|
||||
return valid(NotaryConfig(configuration[validating], configuration[serviceLegalName], configuration[className], configuration[etaMessageThresholdSeconds], configuration[extraConfig]))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,6 +60,7 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet()
|
||||
fun internalSchemas() = requiredSchemas.keys + extraSchemas.filter { schema -> // when mapped schemas from the finance module are present, they are considered as internal ones
|
||||
schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" ||
|
||||
schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" ||
|
||||
schema::class.qualifiedName == "net.corda.node.services.transactions.NodeNotarySchemaV1" ||
|
||||
schema::class.qualifiedName == "net.corda.notary.jpa.JPANotarySchemaV1"
|
||||
}
|
||||
|
||||
|
@ -72,7 +72,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
val stateMachine: StateMachine,
|
||||
val serviceHub: ServiceHubInternal,
|
||||
val checkpointSerializationContext: CheckpointSerializationContext,
|
||||
val unfinishedFibers: ReusableLatch
|
||||
val unfinishedFibers: ReusableLatch,
|
||||
val waitTimeUpdateHook: (id: StateMachineRunId, timeout: Long) -> Unit
|
||||
)
|
||||
|
||||
internal var transientValues: TransientReference<TransientValues>? = null
|
||||
@ -415,6 +416,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
return transientState!!.value
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook to allow a timed flow to update its own timeout (i.e. how long it can be suspended before it gets
|
||||
* retried.
|
||||
*/
|
||||
override fun updateTimedFlowTimeout(timeoutSeconds: Long) {
|
||||
getTransientField(TransientValues::waitTimeUpdateHook).invoke(id, timeoutSeconds)
|
||||
}
|
||||
|
||||
override val stateMachine get() = getTransientField(TransientValues::stateMachine)
|
||||
|
||||
/**
|
||||
|
@ -572,22 +572,54 @@ class MultiThreadedStateMachineManager(
|
||||
if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true)
|
||||
scheduledTimeout.retryCount
|
||||
} else 0
|
||||
val scheduledFuture = scheduleTimeoutException(flow, retryCount)
|
||||
val scheduledFuture = scheduleTimeoutException(flow, calculateDefaultTimeoutSeconds(retryCount))
|
||||
timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount + 1)
|
||||
} else {
|
||||
logger.warn("Unable to schedule timeout for flow $flowId – flow not found.")
|
||||
}
|
||||
}
|
||||
|
||||
private fun resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) {
|
||||
if (timeoutSeconds < serviceHub.configuration.flowTimeout.timeout.seconds) {
|
||||
logger.debug { "Ignoring request to set time-out on timed flow $flowId to $timeoutSeconds seconds which is shorter than default of ${serviceHub.configuration.flowTimeout.timeout.seconds} seconds." }
|
||||
return
|
||||
}
|
||||
logger.debug { "Processing request to set time-out on timed flow $flowId to $timeoutSeconds seconds." }
|
||||
concurrentBox.concurrent {
|
||||
resetCustomTimeout(flowId, timeoutSeconds)
|
||||
}
|
||||
}
|
||||
|
||||
private fun InnerState.resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) {
|
||||
val flow = flows[flowId]
|
||||
if (flow != null) {
|
||||
val scheduledTimeout = timedFlows[flowId]
|
||||
val retryCount = if (scheduledTimeout != null) {
|
||||
val timeoutFuture = scheduledTimeout.scheduledFuture
|
||||
if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true)
|
||||
scheduledTimeout.retryCount
|
||||
} else 0
|
||||
val scheduledFuture = scheduleTimeoutException(flow, timeoutSeconds)
|
||||
timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount)
|
||||
} else {
|
||||
logger.warn("Unable to schedule timeout for flow $flowId – flow not found.")
|
||||
}
|
||||
}
|
||||
|
||||
/** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */
|
||||
private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> {
|
||||
private fun scheduleTimeoutException(flow: Flow, delay: Long): ScheduledFuture<*> {
|
||||
return with(serviceHub.configuration.flowTimeout) {
|
||||
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
|
||||
val jitteredDelaySeconds = maxOf(1L ,timeoutDelaySeconds/2 + (Random().nextDouble() * timeoutDelaySeconds/2).toLong())
|
||||
timeoutScheduler.schedule({
|
||||
val event = Event.Error(FlowTimeoutException(maxRestartCount))
|
||||
flow.fiber.scheduleEvent(event)
|
||||
}, jitteredDelaySeconds, TimeUnit.SECONDS)
|
||||
}, delay, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
private fun calculateDefaultTimeoutSeconds(retryCount: Int): Long {
|
||||
return with(serviceHub.configuration.flowTimeout) {
|
||||
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
|
||||
maxOf(1L, ((1.0 + Math.random()) * timeoutDelaySeconds / 2).toLong())
|
||||
}
|
||||
}
|
||||
|
||||
@ -635,7 +667,8 @@ class MultiThreadedStateMachineManager(
|
||||
stateMachine = StateMachine(id, secureRandom),
|
||||
serviceHub = serviceHub,
|
||||
checkpointSerializationContext = checkpointSerializationContext!!,
|
||||
unfinishedFibers = unfinishedFibers
|
||||
unfinishedFibers = unfinishedFibers,
|
||||
waitTimeUpdateHook = { flowId, timeout -> resetCustomTimeout(flowId, timeout) }
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -13,7 +13,11 @@ import net.corda.core.flows.FlowInfo
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.TimedFlow
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.castIfPossible
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
@ -34,7 +38,11 @@ import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.config.shouldCheckCheckpoints
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
|
||||
import net.corda.node.services.statemachine.interceptors.*
|
||||
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
|
||||
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
|
||||
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
|
||||
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
|
||||
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
|
||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.injectOldProgressTracker
|
||||
@ -47,11 +55,13 @@ import org.apache.logging.log4j.LogManager
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.security.SecureRandom
|
||||
import java.util.*
|
||||
import java.util.concurrent.*
|
||||
import java.util.HashSet
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.collections.ArrayList
|
||||
import kotlin.collections.HashMap
|
||||
import kotlin.streams.toList
|
||||
|
||||
/**
|
||||
@ -579,22 +589,54 @@ class SingleThreadedStateMachineManager(
|
||||
if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true)
|
||||
scheduledTimeout.retryCount
|
||||
} else 0
|
||||
val scheduledFuture = scheduleTimeoutException(flow, retryCount)
|
||||
val scheduledFuture = scheduleTimeoutException(flow, calculateDefaultTimeoutSeconds(retryCount))
|
||||
timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount + 1)
|
||||
} else {
|
||||
logger.warn("Unable to schedule timeout for flow $flowId – flow not found.")
|
||||
}
|
||||
}
|
||||
|
||||
private fun resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) {
|
||||
if (timeoutSeconds < serviceHub.configuration.flowTimeout.timeout.seconds) {
|
||||
logger.debug { "Ignoring request to set time-out on timed flow $flowId to $timeoutSeconds seconds which is shorter than default of ${serviceHub.configuration.flowTimeout.timeout.seconds} seconds." }
|
||||
return
|
||||
}
|
||||
logger.debug { "Processing request to set time-out on timed flow $flowId to $timeoutSeconds seconds." }
|
||||
mutex.locked {
|
||||
resetCustomTimeout(flowId, timeoutSeconds)
|
||||
}
|
||||
}
|
||||
|
||||
private fun InnerState.resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) {
|
||||
val flow = flows[flowId]
|
||||
if (flow != null) {
|
||||
val scheduledTimeout = timedFlows[flowId]
|
||||
val retryCount = if (scheduledTimeout != null) {
|
||||
val timeoutFuture = scheduledTimeout.scheduledFuture
|
||||
if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true)
|
||||
scheduledTimeout.retryCount
|
||||
} else 0
|
||||
val scheduledFuture = scheduleTimeoutException(flow, timeoutSeconds)
|
||||
timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount)
|
||||
} else {
|
||||
logger.warn("Unable to schedule timeout for flow $flowId – flow not found.")
|
||||
}
|
||||
}
|
||||
|
||||
/** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */
|
||||
private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> {
|
||||
private fun scheduleTimeoutException(flow: Flow, delay: Long): ScheduledFuture<*> {
|
||||
return with(serviceHub.configuration.flowTimeout) {
|
||||
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
|
||||
val jitteredDelaySeconds = maxOf(1L, timeoutDelaySeconds/2 + (Math.random() * timeoutDelaySeconds/2).toLong())
|
||||
timeoutScheduler.schedule({
|
||||
val event = Event.Error(FlowTimeoutException(maxRestartCount))
|
||||
flow.fiber.scheduleEvent(event)
|
||||
}, jitteredDelaySeconds, TimeUnit.SECONDS)
|
||||
}, delay, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
private fun calculateDefaultTimeoutSeconds(retryCount: Int): Long {
|
||||
return with(serviceHub.configuration.flowTimeout) {
|
||||
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
|
||||
maxOf(1L, ((1.0 + Math.random()) * timeoutDelaySeconds / 2).toLong())
|
||||
}
|
||||
}
|
||||
|
||||
@ -642,7 +684,8 @@ class SingleThreadedStateMachineManager(
|
||||
stateMachine = StateMachine(id, secureRandom),
|
||||
serviceHub = serviceHub,
|
||||
checkpointSerializationContext = checkpointSerializationContext!!,
|
||||
unfinishedFibers = unfinishedFibers
|
||||
unfinishedFibers = unfinishedFibers,
|
||||
waitTimeUpdateHook = { flowId, timeout -> resetCustomTimeout(flowId, timeout) }
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,7 @@ import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.transactions.ContractUpgradeFilteredTransaction
|
||||
import net.corda.core.transactions.FilteredTransaction
|
||||
import net.corda.core.transactions.NotaryChangeWireTransaction
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
* The received transaction is not checked for contract-validity, as that would require fully
|
||||
@ -17,7 +18,7 @@ import net.corda.core.transactions.NotaryChangeWireTransaction
|
||||
* the caller, it is possible to raise a dispute and verify the validity of the transaction and subsequently
|
||||
* undo the commit of the input states (the exact mechanism still needs to be worked out).
|
||||
*/
|
||||
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) {
|
||||
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService, etaThreshold: Duration = defaultEstimatedWaitTime) : NotaryServiceFlow(otherSideSession, service, etaThreshold) {
|
||||
override fun extractParts(requestPayload: NotarisationPayload): TransactionParts {
|
||||
val tx = requestPayload.coreTransaction
|
||||
return when (tx) {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import com.codahale.metrics.SlidingWindowReservoir
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
@ -12,8 +13,10 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.NamedCacheFactory
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.internal.elapsedTime
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.internal.notary.isConsumedByTheSameTx
|
||||
import net.corda.core.internal.notary.validateTimeWindow
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
@ -27,11 +30,20 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.LinkedHashMap
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import javax.persistence.*
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.EmbeddedId
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.GeneratedValue
|
||||
import javax.persistence.Id
|
||||
import javax.persistence.Lob
|
||||
import javax.persistence.MappedSuperclass
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
/** A RDBMS backed Uniqueness provider */
|
||||
@ -70,6 +82,14 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
var requestDate: Instant
|
||||
)
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_transactions")
|
||||
class CommittedTransaction(
|
||||
@Id
|
||||
@Column(name = "transaction_id", nullable = false, length = 64)
|
||||
val transactionId: String
|
||||
)
|
||||
|
||||
private data class CommitRequest(
|
||||
val states: List<StateRef>,
|
||||
val txId: SecureHash,
|
||||
@ -86,6 +106,34 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
private val commitLog = createMap(cacheFactory)
|
||||
|
||||
private val requestQueue = LinkedBlockingQueue<CommitRequest>(requestQueueSize)
|
||||
private val nrQueuedStates = AtomicInteger(0)
|
||||
|
||||
/**
|
||||
* Measured in states per minute, with a minimum of 1. We take an average of the last 100 commits.
|
||||
* Minutes was chosen to increase accuracy by 60x over seconds, given we have to use longs here.
|
||||
*/
|
||||
private val throughputHistory = SlidingWindowReservoir(100)
|
||||
@Volatile
|
||||
var throughput: Double = 0.0
|
||||
|
||||
/**
|
||||
* Estimated time of request processing.
|
||||
* This uses performance metrics to gauge how long the wait time for a newly queued state will probably be.
|
||||
* It checks that there is actual traffic going on (i.e. a non-zero number of states are queued and there
|
||||
* is actual throughput) and then returns the expected wait time scaled up by a factor of 2 to give a probable
|
||||
* upper bound.
|
||||
*
|
||||
* @param numStates The number of states (input + reference) we're about to request be notarised.
|
||||
*/
|
||||
override fun getEta(numStates: Int): Duration {
|
||||
val rate = throughput
|
||||
val nrStates = nrQueuedStates.getAndAdd(numStates)
|
||||
log.debug { "rate: $rate, queueSize: $nrStates" }
|
||||
if (rate > 0.0 && nrStates > 0) {
|
||||
return Duration.ofSeconds((2 * TimeUnit.MINUTES.toSeconds(1) * nrStates / rate).toLong())
|
||||
}
|
||||
return NotaryServiceFlow.defaultEstimatedWaitTime
|
||||
}
|
||||
|
||||
/** A request processor thread. */
|
||||
private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) {
|
||||
@ -190,13 +238,32 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
logRequest(txId, callerIdentity, requestSignature)
|
||||
val conflictingStates = findAlreadyCommitted(states, references, commitLog)
|
||||
if (conflictingStates.isNotEmpty()) {
|
||||
handleConflicts(txId, conflictingStates)
|
||||
if (states.isEmpty()) {
|
||||
handleReferenceConflicts(txId, conflictingStates)
|
||||
} else {
|
||||
handleConflicts(txId, conflictingStates)
|
||||
}
|
||||
} else {
|
||||
handleNoConflicts(timeWindow, states, txId, commitLog)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun previouslyCommitted(txId: SecureHash): Boolean {
|
||||
val session = currentDBSession()
|
||||
return session.find(CommittedTransaction::class.java, txId.toString()) != null
|
||||
}
|
||||
|
||||
private fun handleReferenceConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>) {
|
||||
val session = currentDBSession()
|
||||
if (!previouslyCommitted(txId)) {
|
||||
val conflictError = NotaryError.Conflict(txId, conflictingStates)
|
||||
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
|
||||
throw NotaryInternalException(conflictError)
|
||||
}
|
||||
log.debug { "Transaction $txId already notarised" }
|
||||
}
|
||||
|
||||
private fun handleConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>) {
|
||||
if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
|
||||
log.debug { "Transaction $txId already notarised" }
|
||||
@ -214,15 +281,32 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
states.forEach { stateRef ->
|
||||
commitLog[stateRef] = txId
|
||||
}
|
||||
val session = currentDBSession()
|
||||
session.persist(CommittedTransaction(txId.toString()))
|
||||
log.debug { "Successfully committed all input states: $states" }
|
||||
} else {
|
||||
if (states.isEmpty() && previouslyCommitted(txId)) {
|
||||
return
|
||||
}
|
||||
throw NotaryInternalException(outsideTimeWindowError)
|
||||
}
|
||||
}
|
||||
|
||||
private fun decrementQueueSize(request: CommitRequest): Int {
|
||||
val nrStates = request.states.size + request.references.size
|
||||
nrQueuedStates.addAndGet(-nrStates)
|
||||
return nrStates
|
||||
}
|
||||
|
||||
private fun processRequest(request: CommitRequest) {
|
||||
val numStates = decrementQueueSize(request)
|
||||
try {
|
||||
commitOne(request.states, request.txId, request.callerIdentity, request.requestSignature, request.timeWindow, request.references)
|
||||
val duration = elapsedTime {
|
||||
commitOne(request.states, request.txId, request.callerIdentity, request.requestSignature, request.timeWindow, request.references)
|
||||
}
|
||||
val statesPerMinute = numStates.toLong() * TimeUnit.MINUTES.toNanos(1) / duration.toNanos()
|
||||
throughputHistory.update(maxOf(statesPerMinute, 1))
|
||||
throughput = throughputHistory.snapshot.median // Median deemed more stable / representative than mean.
|
||||
respondWithSuccess(request)
|
||||
} catch (e: Exception) {
|
||||
log.warn("Error processing commit request", e)
|
||||
@ -231,11 +315,11 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
}
|
||||
|
||||
private fun respondWithError(request: CommitRequest, exception: Exception) {
|
||||
if (exception is NotaryInternalException) {
|
||||
request.future.set(UniquenessProvider.Result.Failure(exception.error))
|
||||
} else {
|
||||
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
|
||||
}
|
||||
if (exception is NotaryInternalException) {
|
||||
request.future.set(UniquenessProvider.Result.Failure(exception.error))
|
||||
} else {
|
||||
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
|
||||
}
|
||||
}
|
||||
|
||||
private fun respondWithSuccess(request: CommitRequest) {
|
||||
|
@ -4,6 +4,7 @@ import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import java.security.PublicKey
|
||||
|
||||
@ -17,10 +18,10 @@ class SimpleNotaryService(override val services: ServiceHubInternal, override va
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow {
|
||||
return if (notaryConfig.validating) {
|
||||
log.info("Starting in validating mode")
|
||||
ValidatingNotaryFlow(otherPartySession, this)
|
||||
ValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
|
||||
} else {
|
||||
log.info("Starting in non-validating mode")
|
||||
NonValidatingNotaryFlow(otherPartySession, this)
|
||||
NonValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
|
||||
}
|
||||
}
|
||||
|
||||
@ -34,7 +35,8 @@ object NodeNotarySchema
|
||||
object NodeNotarySchemaV1 : MappedSchema(schemaFamily = NodeNotarySchema.javaClass, version = 1,
|
||||
mappedTypes = listOf(PersistentUniquenessProvider.BaseComittedState::class.java,
|
||||
PersistentUniquenessProvider.Request::class.java,
|
||||
PersistentUniquenessProvider.CommittedState::class.java
|
||||
PersistentUniquenessProvider.CommittedState::class.java,
|
||||
PersistentUniquenessProvider.CommittedTransaction::class.java
|
||||
)) {
|
||||
override val migrationResource = "node-notary.changelog-master"
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionWithSignatures
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
* A notary commit flow that makes sure a given transaction is valid before committing it. This does mean that the calling
|
||||
@ -19,7 +20,7 @@ import net.corda.core.transactions.WireTransaction
|
||||
* has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was
|
||||
* indeed valid.
|
||||
*/
|
||||
open class ValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) {
|
||||
open class ValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService, etaThreshold: Duration = defaultEstimatedWaitTime) : NotaryServiceFlow(otherSideSession, service, etaThreshold) {
|
||||
override fun extractParts(requestPayload: NotarisationPayload): TransactionParts {
|
||||
val stx = requestPayload.signedTransaction
|
||||
val timeWindow: TimeWindow? = if (stx.coreTransaction is WireTransaction) stx.tx.timeWindow else null
|
||||
|
@ -0,0 +1,14 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||
|
||||
<changeSet author="R3.Corda" id="create-notary-committed-transactions-table">
|
||||
<createTable tableName="node_notary_committed_transactions">
|
||||
<column name="transaction_id" type="NVARCHAR(64)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
</createTable>
|
||||
<addPrimaryKey columnNames="transaction_id" constraintName="node_notary_transactions_pkey" tableName="node_notary_committed_transactions"/>
|
||||
</changeSet>
|
||||
</databaseChangeLog>
|
@ -5,6 +5,9 @@
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||
|
||||
<include file="migration/node-notary.changelog-init.xml"/>
|
||||
|
||||
<include file="migration/node-notary.changelog-v1.xml"/>
|
||||
<include file="migration/node-notary.changelog-pkey.xml"/>
|
||||
<include file="migration/node-notary.changelog-committed-transactions-table.xml" />
|
||||
|
||||
</databaseChangeLog>
|
||||
|
@ -6,22 +6,27 @@ import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowIORequest
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.transactions.ValidatingNotaryFlow
|
||||
import net.corda.node.services.transactions.NonValidatingNotaryFlow
|
||||
import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
@ -30,37 +35,50 @@ import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.internal.GlobalDatabaseRule
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.node.*
|
||||
import net.corda.testing.node.internal.*
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import net.corda.testing.node.MockNetFlowTimeOut
|
||||
import net.corda.testing.node.MockNetNotaryConfig
|
||||
import net.corda.testing.node.MockNetworkParameters
|
||||
import net.corda.testing.node.MockNodeConfigOverrides
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.TestStartedNode
|
||||
import net.corda.testing.node.internal.cordappsForPackages
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import org.junit.Before
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.ExternalResource
|
||||
import org.junit.rules.RuleChain
|
||||
import org.slf4j.MDC
|
||||
import java.security.PublicKey
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.TimeoutException
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import kotlin.test.assertNotEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class TimedFlowTestRule(val clusterSize: Int) : ExternalResource() {
|
||||
|
||||
lateinit var mockNet: InternalMockNetwork
|
||||
lateinit var notary: Party
|
||||
lateinit var node: TestStartedNode
|
||||
lateinit var patientNode: TestStartedNode
|
||||
|
||||
private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair<Party, TestStartedNode> {
|
||||
private fun startClusterAndNode(mockNet: InternalMockNetwork): Triple<Party, TestStartedNode, TestStartedNode> {
|
||||
val replicaIds = (0 until clusterSize)
|
||||
val serviceLegalName = CordaX500Name("Custom Notary", "Zurich", "CH")
|
||||
val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
|
||||
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
|
||||
serviceLegalName)
|
||||
|
||||
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true))))
|
||||
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, false))))
|
||||
val notaryConfig = MockNetNotaryConfig(
|
||||
serviceLegalName = serviceLegalName,
|
||||
validating = true,
|
||||
className = TimedFlowTests.TestNotaryService::class.java.name)
|
||||
validating = false,
|
||||
className = TimedFlowTests.TestNotaryService::class.java.name
|
||||
)
|
||||
|
||||
val notaryNodes = (0 until clusterSize).map {
|
||||
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = MockNodeConfigOverrides(
|
||||
@ -71,18 +89,25 @@ class TimedFlowTestRule(val clusterSize: Int) : ExternalResource() {
|
||||
val aliceNode = mockNet.createUnstartedNode(
|
||||
InternalMockNodeParameters(
|
||||
legalName = CordaX500Name("Alice", "AliceCorp", "GB"),
|
||||
configOverrides = MockNodeConfigOverrides(
|
||||
flowTimeout = MockNetFlowTimeOut(10.seconds, 3, 1.0)
|
||||
)))
|
||||
configOverrides = MockNodeConfigOverrides(flowTimeout = MockNetFlowTimeOut(2.seconds, 3, 1.0))
|
||||
)
|
||||
)
|
||||
|
||||
val patientNode = mockNet.createUnstartedNode(
|
||||
InternalMockNodeParameters(
|
||||
legalName = CordaX500Name("Bob", "BobCorp", "GB"),
|
||||
configOverrides = MockNodeConfigOverrides(flowTimeout = MockNetFlowTimeOut(10.seconds, 3, 1.0))
|
||||
)
|
||||
)
|
||||
|
||||
// MockNetwork doesn't support notary clusters, so we create all the nodes we need unstarted, and then install the
|
||||
// network-parameters in their directories before they're started.
|
||||
val node = (notaryNodes + aliceNode).map { node ->
|
||||
val nodes = (notaryNodes + aliceNode + patientNode).map { node ->
|
||||
networkParameters.install(mockNet.baseDirectory(node.id))
|
||||
node.start()
|
||||
}.last()
|
||||
}
|
||||
|
||||
return Pair(notaryIdentity, node)
|
||||
return Triple(notaryIdentity, nodes[nodes.lastIndex - 1], nodes.last())
|
||||
}
|
||||
|
||||
|
||||
@ -95,6 +120,7 @@ class TimedFlowTestRule(val clusterSize: Int) : ExternalResource() {
|
||||
val started = startClusterAndNode(mockNet)
|
||||
notary = started.first
|
||||
node = started.second
|
||||
patientNode = started.third
|
||||
}
|
||||
|
||||
override fun after() {
|
||||
@ -107,9 +133,13 @@ class TimedFlowTests {
|
||||
/** A shared counter across all notary service nodes. */
|
||||
var requestsReceived: AtomicInteger = AtomicInteger(0)
|
||||
|
||||
private val waitEtaThreshold: Duration = NotaryServiceFlow.defaultEstimatedWaitTime
|
||||
private var waitETA: Duration = waitEtaThreshold
|
||||
|
||||
private val notary by lazy { globalRule.notary }
|
||||
private val node by lazy { globalRule.node }
|
||||
private val patientNode by lazy { globalRule.patientNode }
|
||||
|
||||
|
||||
init {
|
||||
LogHelper.setLevel("+net.corda.flow", "+net.corda.testing.node", "+net.corda.node.services.messaging")
|
||||
@ -167,6 +197,70 @@ class TimedFlowTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `timed flow can update its ETA`() {
|
||||
try {
|
||||
waitETA = 10.minutes
|
||||
node.run {
|
||||
val issueTx = signInitialTransaction(notary) {
|
||||
setTimeWindow(services.clock.instant(), 30.seconds)
|
||||
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
||||
}
|
||||
val flow = NotaryFlow.Client(issueTx)
|
||||
val progressTracker = flow.progressTracker
|
||||
assertNotEquals(ProgressTracker.DONE, progressTracker.currentStep)
|
||||
val progressTrackerDone = getDoneFuture(progressTracker)
|
||||
|
||||
val resultFuture = services.startFlow(flow).resultFuture
|
||||
var exceptionThrown = false
|
||||
try {
|
||||
resultFuture.get(3, TimeUnit.SECONDS)
|
||||
} catch (e: TimeoutException) {
|
||||
exceptionThrown = true
|
||||
}
|
||||
assertTrue(exceptionThrown)
|
||||
flow.stateMachine.updateTimedFlowTimeout(2)
|
||||
val notarySignatures = resultFuture.get(10, TimeUnit.SECONDS)
|
||||
(issueTx + notarySignatures).verifyRequiredSignatures()
|
||||
progressTrackerDone.get()
|
||||
}
|
||||
} finally {
|
||||
waitETA = waitEtaThreshold
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `timed flow cannot update its ETA to less than default`() {
|
||||
try {
|
||||
waitETA = 1.seconds
|
||||
patientNode.run {
|
||||
val issueTx = signInitialTransaction(notary) {
|
||||
setTimeWindow(services.clock.instant(), 30.seconds)
|
||||
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
||||
}
|
||||
val flow = NotaryFlow.Client(issueTx)
|
||||
val progressTracker = flow.progressTracker
|
||||
assertNotEquals(ProgressTracker.DONE, progressTracker.currentStep)
|
||||
val progressTrackerDone = getDoneFuture(progressTracker)
|
||||
|
||||
val resultFuture = services.startFlow(flow).resultFuture
|
||||
flow.stateMachine.updateTimedFlowTimeout(1)
|
||||
var exceptionThrown = false
|
||||
try {
|
||||
resultFuture.get(3, TimeUnit.SECONDS)
|
||||
} catch (e: TimeoutException) {
|
||||
exceptionThrown = true
|
||||
}
|
||||
assertTrue(exceptionThrown)
|
||||
val notarySignatures = resultFuture.get(10, TimeUnit.SECONDS)
|
||||
(issueTx + notarySignatures).verifyRequiredSignatures()
|
||||
progressTrackerDone.get()
|
||||
}
|
||||
} finally {
|
||||
waitETA = waitEtaThreshold
|
||||
}
|
||||
}
|
||||
|
||||
private fun TestStartedNode.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction {
|
||||
return services.signInitialTransaction(
|
||||
TransactionBuilder(notary).apply {
|
||||
@ -183,6 +277,10 @@ class TimedFlowTests {
|
||||
}.bufferUntilSubscribed().toBlocking().toFuture()
|
||||
}
|
||||
|
||||
/**
|
||||
* A test notary service that will just stop forever the first time you invoke its commitInputStates method and will succeed the
|
||||
* second time around.
|
||||
*/
|
||||
class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
override val uniquenessProvider = object : UniquenessProvider {
|
||||
/** A dummy commit method that immediately returns a success message. */
|
||||
@ -191,30 +289,27 @@ class TimedFlowTests {
|
||||
set(UniquenessProvider.Result.Success)
|
||||
}
|
||||
}
|
||||
|
||||
override fun getEta(numStates: Int): Duration = waitETA
|
||||
}
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = TestNotaryFlow(otherPartySession, this)
|
||||
@Suspendable
|
||||
override fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>) {
|
||||
val callingFlow = FlowLogic.currentTopLevel
|
||||
?: throw IllegalStateException("This method should be invoked in a flow context.")
|
||||
|
||||
if (requestsReceived.getAndIncrement() == 0) {
|
||||
log.info("Ignoring")
|
||||
// Waiting forever
|
||||
callingFlow.stateMachine.suspend(FlowIORequest.WaitForLedgerCommit(SecureHash.randomSHA256()), false)
|
||||
} else {
|
||||
log.info("Processing")
|
||||
super.commitInputStates(inputs, txId, caller, requestSignature, timeWindow, references)
|
||||
}
|
||||
}
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = NonValidatingNotaryFlow(otherPartySession, this, waitEtaThreshold)
|
||||
override fun start() {}
|
||||
override fun stop() {}
|
||||
}
|
||||
|
||||
/** A notary flow that will yield without returning a response on the very first received request. */
|
||||
private class TestNotaryFlow(otherSide: FlowSession, service: TestNotaryService) : ValidatingNotaryFlow(otherSide, service) {
|
||||
@Suspendable
|
||||
override fun verifyTransaction(requestPayload: NotarisationPayload) {
|
||||
val myIdentity = serviceHub.myInfo.legalIdentities.first()
|
||||
MDC.put("name", myIdentity.name.toString())
|
||||
logger.info("Received a request from ${otherSideSession.counterparty.name}")
|
||||
val stx = requestPayload.signedTransaction
|
||||
subFlow(ResolveTransactionsFlow(stx, otherSideSession))
|
||||
|
||||
if (requestsReceived.getAndIncrement() == 0) {
|
||||
logger.info("Ignoring")
|
||||
// Waiting forever
|
||||
stateMachine.suspend(FlowIORequest.WaitForLedgerCommit(SecureHash.randomSHA256()), false)
|
||||
} else {
|
||||
logger.info("Processing")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -142,6 +142,10 @@ class FlowStateMachineComparatorTest {
|
||||
get() = throw NotImplementedError()
|
||||
override val ourSenderUUID: String?
|
||||
get() = throw NotImplementedError()
|
||||
|
||||
override fun updateTimedFlowTimeout(timeoutSeconds: Long) {
|
||||
throw NotImplementedError()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1,13 +1,16 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.NullKeys
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
@ -18,6 +21,7 @@ import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.testing.internal.configureDatabase
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.TestClock
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
@ -72,6 +76,94 @@ class PersistentUniquenessProviderTests {
|
||||
val error = (response as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
|
||||
|
||||
val conflictCause = error.consumedStates[inputState]!!
|
||||
assertEquals(firstTxId.sha256(), conflictCause.hashOfTransactionId)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `rejects transaction with invalid time window`() {
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||
val inputState1 = generateStateRef()
|
||||
val firstTxId = SecureHash.randomSHA256()
|
||||
val timeWindow = TimeWindow.fromOnly(Clock.systemUTC().instant().plus(30.minutes))
|
||||
val result = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get()
|
||||
val error = (result as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid
|
||||
assertEquals(timeWindow, error.txTimeWindow)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `handles transaction with valid time window`() {
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||
val inputState1 = generateStateRef()
|
||||
val firstTxId = SecureHash.randomSHA256()
|
||||
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
|
||||
val result = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get()
|
||||
assertEquals(UniquenessProvider.Result.Success, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `handles transaction with valid time window without inputs`() {
|
||||
val testClock = TestClock(Clock.systemUTC())
|
||||
val provider = PersistentUniquenessProvider(testClock, database, TestingNamedCacheFactory())
|
||||
val firstTxId = SecureHash.randomSHA256()
|
||||
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
|
||||
val result = provider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
|
||||
assertEquals(UniquenessProvider.Result.Success, result)
|
||||
|
||||
// Re-notarisation works outside the specified time window.
|
||||
testClock.advanceBy(90.minutes)
|
||||
val result2 = provider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
|
||||
assertEquals(UniquenessProvider.Result.Success, result2)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `handles reference states`() {
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||
val inputState1 = generateStateRef()
|
||||
val inputState2 = generateStateRef()
|
||||
val firstTxId = SecureHash.randomSHA256()
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
|
||||
// Conflict free transaction goes through.
|
||||
val result1 = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, references = listOf(inputState2)).get()
|
||||
assertEquals(UniquenessProvider.Result.Success, result1)
|
||||
|
||||
// Referencing a spent state results in a conflict.
|
||||
val result2 = provider.commit(listOf(inputState2), secondTxId, identity, requestSignature, references = listOf(inputState1)).get()
|
||||
val error = (result2 as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
|
||||
val conflictCause = error.consumedStates[inputState1]!!
|
||||
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
|
||||
assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type)
|
||||
|
||||
// Re-notarisation works.
|
||||
val result3 = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, references = listOf(inputState2)).get()
|
||||
assertEquals(UniquenessProvider.Result.Success, result3)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `handles transaction with reference states only`() {
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||
val inputState1 = generateStateRef()
|
||||
val firstTxId = SecureHash.randomSHA256()
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
val thirdTxId = SecureHash.randomSHA256()
|
||||
|
||||
// Conflict free transaction goes through.
|
||||
val result1 = provider.commit(emptyList(), firstTxId, identity, requestSignature, references = listOf(inputState1)).get()
|
||||
assertEquals(UniquenessProvider.Result.Success, result1)
|
||||
|
||||
// Commit state 1.
|
||||
val result2 = provider.commit(listOf(inputState1), secondTxId, identity, requestSignature).get()
|
||||
assertEquals(UniquenessProvider.Result.Success, result2)
|
||||
|
||||
// Re-notarisation works.
|
||||
val result3 = provider.commit(emptyList(), firstTxId, identity, requestSignature, references = listOf(inputState1)).get()
|
||||
assertEquals(UniquenessProvider.Result.Success, result3)
|
||||
|
||||
// Transaction referencing the spent sate fails.
|
||||
val result4 = provider.commit(emptyList(), thirdTxId, identity, requestSignature, references = listOf(inputState1)).get()
|
||||
val error = (result4 as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
|
||||
val conflictCause = error.consumedStates[inputState1]!!
|
||||
assertEquals(conflictCause.hashOfTransactionId, secondTxId.sha256())
|
||||
assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type)
|
||||
}
|
||||
}
|
||||
|
@ -49,6 +49,7 @@ class CheckAllTheTestFlows {
|
||||
notaryCustomOverrides = driverParameters.notaryCustomOverrides,
|
||||
inMemoryDB = driverParameters.inMemoryDB,
|
||||
cordappsForAllNodes = DriverDSLImpl.cordappsInCurrentAndAdditionalPackages(driverParameters.extraCordappPackagesToScan),
|
||||
signCordapps = false,
|
||||
enableSNI = driverParameters.enableSNI
|
||||
)
|
||||
|
||||
|
@ -14,6 +14,11 @@ interface ObjectSerializer : AMQPSerializer<Any> {
|
||||
|
||||
companion object {
|
||||
fun make(typeInformation: LocalTypeInformation, factory: LocalSerializerFactory): ObjectSerializer {
|
||||
if (typeInformation is LocalTypeInformation.NonComposable)
|
||||
throw NotSerializableException(
|
||||
"Trying to build an object serializer for ${typeInformation.typeIdentifier.prettyPrint(false)}, " +
|
||||
"but it is not constructible from its public properties, and so requires a custom serialiser.")
|
||||
|
||||
val typeDescriptor = factory.createDescriptor(typeInformation)
|
||||
val typeNotation = TypeNotationGenerator.getTypeNotation(typeInformation, typeDescriptor)
|
||||
|
||||
|
@ -1,8 +1,13 @@
|
||||
package net.corda.serialization.internal.amqp
|
||||
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.serialization.internal.amqp.testutils.*
|
||||
import org.junit.Test
|
||||
import java.io.NotSerializableException
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFails
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.fail
|
||||
|
||||
// Prior to certain fixes being made within the [PropertySerializaer] classes these simple
|
||||
// deserialization operations would've blown up with type mismatch errors where the deserlized
|
||||
@ -527,5 +532,65 @@ class DeserializeSimpleTypesTests {
|
||||
DeserializationInput(sf2).deserialize(serializedA.obj)
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
class Garbo private constructor(value: Int) {
|
||||
companion object {
|
||||
fun make(value: Int) = Garbo(value)
|
||||
}
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
class Greta(val garbo: Garbo)
|
||||
|
||||
@CordaSerializable
|
||||
class Owner(val value: PropertyWithoutCordaSerializable)
|
||||
|
||||
class PropertyWithoutCordaSerializable(val value: Int)
|
||||
|
||||
@Test
|
||||
fun classHasNoPublicConstructor() {
|
||||
assertFailsWithMessage("Trying to build an object serializer for ${Garbo::class.java.name}, " +
|
||||
"but it is not constructible from its public properties, and so requires a custom serialiser.") {
|
||||
TestSerializationOutput(VERBOSE, sf1).serializeAndReturnSchema(Garbo.make(1))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun propertyClassHasNoPublicConstructor() {
|
||||
assertFailsWithMessage("Trying to build an object serializer for ${Greta::class.java.name}, " +
|
||||
"but it is not constructible from its public properties, and so requires a custom serialiser.") {
|
||||
TestSerializationOutput(VERBOSE, sf1).serializeAndReturnSchema(Greta(Garbo.make(1)))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun notWhitelistedError() {
|
||||
val factory = testDefaultFactoryWithWhitelist()
|
||||
assertFailsWithMessage(
|
||||
"Class \"class ${PropertyWithoutCordaSerializable::class.java.name}\" " +
|
||||
"is not on the whitelist or annotated with @CordaSerializable.") {
|
||||
TestSerializationOutput(VERBOSE, factory).serialize(PropertyWithoutCordaSerializable(1))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun propertyClassNotWhitelistedError() {
|
||||
val factory = testDefaultFactoryWithWhitelist()
|
||||
assertFailsWithMessage(
|
||||
"Class \"class ${PropertyWithoutCordaSerializable::class.java.name}\" " +
|
||||
"is not on the whitelist or annotated with @CordaSerializable.") {
|
||||
TestSerializationOutput(VERBOSE, factory).serialize(Owner(PropertyWithoutCordaSerializable(1)))
|
||||
}
|
||||
}
|
||||
|
||||
private fun assertFailsWithMessage(expectedMessage: String, block: () -> Unit) {
|
||||
try {
|
||||
block()
|
||||
fail("Expected an exception, but none was thrown")
|
||||
} catch (e: Exception) {
|
||||
assertEquals(expectedMessage, e.message)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -345,6 +345,7 @@ fun <A> driver(defaultParameters: DriverParameters = DriverParameters(), dsl: Dr
|
||||
notaryCustomOverrides = defaultParameters.notaryCustomOverrides,
|
||||
inMemoryDB = defaultParameters.inMemoryDB,
|
||||
cordappsForAllNodes = defaultParameters.cordappsForAllNodes(),
|
||||
signCordapps = false,
|
||||
enableSNI = defaultParameters.enableSNI
|
||||
),
|
||||
coerce = { it },
|
||||
|
@ -92,6 +92,7 @@ class DriverDSLImpl(
|
||||
val notaryCustomOverrides: Map<String, Any?>,
|
||||
val inMemoryDB: Boolean,
|
||||
val cordappsForAllNodes: Collection<TestCordapp>,
|
||||
val signCordapps: Boolean,
|
||||
val enableSNI: Boolean
|
||||
) : InternalDriverDSL {
|
||||
|
||||
@ -271,7 +272,7 @@ class DriverDSLImpl(
|
||||
return registrationFuture.flatMap {
|
||||
networkMapAvailability.flatMap {
|
||||
// But starting the node proper does require the network map
|
||||
startRegisteredNode(name, it, rpcUsers, verifierType, customOverrides, startInSameProcess, maximumHeapSize, p2pAddress, additionalCordapps, regenerateCordappsOnStart, flowOverrides, bytemanPort)
|
||||
startRegisteredNode(name, it, rpcUsers, verifierType, customOverrides, startInSameProcess, maximumHeapSize, p2pAddress, additionalCordapps, regenerateCordappsOnStart, flowOverrides, signCordapps, bytemanPort)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -287,6 +288,7 @@ class DriverDSLImpl(
|
||||
additionalCordapps: Collection<TestCordapp> = emptySet(),
|
||||
regenerateCordappsOnStart: Boolean = false,
|
||||
flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>> = emptyMap(),
|
||||
signCordapps: Boolean = false,
|
||||
bytemanPort: Int? = null): CordaFuture<NodeHandle> {
|
||||
val rpcAddress = portAllocation.nextHostAndPort()
|
||||
val rpcAdminAddress = portAllocation.nextHostAndPort()
|
||||
@ -319,7 +321,7 @@ class DriverDSLImpl(
|
||||
allowMissingConfig = true,
|
||||
configOverrides = if (overrides.hasPath("devMode")) overrides else overrides + mapOf("devMode" to true)
|
||||
)).checkAndOverrideForInMemoryDB()
|
||||
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize, localNetworkMap, additionalCordapps, regenerateCordappsOnStart, bytemanPort)
|
||||
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize, localNetworkMap, additionalCordapps, regenerateCordappsOnStart, signCordapps, bytemanPort)
|
||||
}
|
||||
|
||||
private fun startNodeRegistration(
|
||||
@ -628,6 +630,7 @@ class DriverDSLImpl(
|
||||
localNetworkMap: LocalNetworkMap?,
|
||||
additionalCordapps: Collection<TestCordapp>,
|
||||
regenerateCordappsOnStart: Boolean = false,
|
||||
signCordapps: Boolean = false,
|
||||
bytemanPort: Int? = null): CordaFuture<NodeHandle> {
|
||||
val visibilityHandle = networkVisibilityController.register(specifiedConfig.corda.myLegalName)
|
||||
val baseDirectory = specifiedConfig.corda.baseDirectory.createDirectories()
|
||||
@ -654,7 +657,7 @@ class DriverDSLImpl(
|
||||
val appOverrides = additionalCordapps.map { it.name to it.version}.toSet()
|
||||
val baseCordapps = cordappsForAllNodes.filter { !appOverrides.contains(it.name to it.version) }
|
||||
|
||||
val cordappDirectories = existingCorDappDirectoriesOption + (baseCordapps + additionalCordapps).map { TestCordappDirectories.getJarDirectory(it).toString() }
|
||||
val cordappDirectories = existingCorDappDirectoriesOption + (baseCordapps + additionalCordapps).map { TestCordappDirectories.getJarDirectory(it, signJar = signCordapps).toString() }
|
||||
|
||||
val config = NodeConfig(specifiedConfig.typesafe.withValue(NodeConfiguration.cordappDirectoriesKey, ConfigValueFactory.fromIterable(cordappDirectories.toSet())))
|
||||
|
||||
@ -1133,6 +1136,7 @@ fun <DI : DriverDSL, D : InternalDriverDSL, A> genericDriver(
|
||||
notaryCustomOverrides = defaultParameters.notaryCustomOverrides,
|
||||
inMemoryDB = defaultParameters.inMemoryDB,
|
||||
cordappsForAllNodes = defaultParameters.cordappsForAllNodes(),
|
||||
signCordapps = false,
|
||||
enableSNI = defaultParameters.enableSNI
|
||||
)
|
||||
)
|
||||
@ -1227,6 +1231,7 @@ fun <A> internalDriver(
|
||||
notaryCustomOverrides: Map<String, Any?> = DriverParameters().notaryCustomOverrides,
|
||||
inMemoryDB: Boolean = DriverParameters().inMemoryDB,
|
||||
cordappsForAllNodes: Collection<TestCordapp> = DriverParameters().cordappsForAllNodes(),
|
||||
signCordapps: Boolean = false,
|
||||
enableSNI: Boolean = DriverParameters().enableSNI,
|
||||
dsl: DriverDSLImpl.() -> A
|
||||
): A {
|
||||
@ -1247,6 +1252,7 @@ fun <A> internalDriver(
|
||||
notaryCustomOverrides = notaryCustomOverrides,
|
||||
inMemoryDB = inMemoryDB,
|
||||
cordappsForAllNodes = cordappsForAllNodes,
|
||||
signCordapps = signCordapps,
|
||||
enableSNI = enableSNI
|
||||
),
|
||||
coerce = { it },
|
||||
|
@ -2,6 +2,7 @@ package net.corda.testing.node.internal
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.PLATFORM_VERSION
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
@ -243,7 +244,7 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration,
|
||||
return InMemoryReceivedMessage(
|
||||
message.topic,
|
||||
OpaqueBytes(message.data.bytes.copyOf()), // Kryo messes with the buffer so give each client a unique copy
|
||||
1,
|
||||
PLATFORM_VERSION,
|
||||
message.uniqueMessageId,
|
||||
message.debugTimestamp,
|
||||
sender.name
|
||||
|
@ -142,6 +142,7 @@ fun <A> rpcDriver(
|
||||
notaryCustomOverrides = notaryCustomOverrides,
|
||||
inMemoryDB = inMemoryDB,
|
||||
cordappsForAllNodes = cordappsForAllNodes,
|
||||
signCordapps = false,
|
||||
enableSNI = enableSNI
|
||||
), externalTrace
|
||||
),
|
||||
|
@ -2,12 +2,11 @@ package net.corda.testing.node.internal
|
||||
|
||||
import com.typesafe.config.ConfigValueFactory
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.deleteRecursively
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.writeText
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.testing.core.JarSignatureTestUtils.signJar
|
||||
import net.corda.testing.core.JarSignatureTestUtils.generateKey
|
||||
import net.corda.testing.node.TestCordapp
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
@ -21,7 +20,8 @@ object TestCordappDirectories {
|
||||
|
||||
private val testCordappsCache = ConcurrentHashMap<TestCordappImpl, Path>()
|
||||
|
||||
fun getJarDirectory(cordapp: TestCordapp, cordappsDirectory: Path = defaultCordappsDirectory): Path {
|
||||
//TODO In future, we may wish to associate a signer attribute to TestCordapp interface itself, and trigger signing from that.
|
||||
fun getJarDirectory(cordapp: TestCordapp, cordappsDirectory: Path = defaultCordappsDirectory, signJar: Boolean = false): Path {
|
||||
cordapp as TestCordappImpl
|
||||
return testCordappsCache.computeIfAbsent(cordapp) {
|
||||
val configString = ConfigValueFactory.fromMap(cordapp.config).toConfig().root().render()
|
||||
@ -37,6 +37,17 @@ object TestCordappDirectories {
|
||||
val configDir = (cordappDir / "config").createDirectories()
|
||||
val jarFile = cordappDir / "$filename.jar"
|
||||
cordapp.packageAsJar(jarFile)
|
||||
//TODO in future we may extend the signing with user-defined key-stores/certs/keys.
|
||||
if (signJar) {
|
||||
val testKeystore = "_teststore"
|
||||
val alias = "Test"
|
||||
val pwd = "secret!"
|
||||
if (!(cordappsDirectory / testKeystore).exists()) {
|
||||
cordappsDirectory.generateKey(alias, pwd, "O=Test Company Ltd,OU=Test,L=London,C=GB")
|
||||
}
|
||||
(cordappsDirectory / testKeystore).copyTo(cordappDir / testKeystore)
|
||||
cordappDir.signJar("$filename.jar", alias, pwd)
|
||||
}
|
||||
(configDir / "$filename.conf").writeText(configString)
|
||||
logger.debug { "$cordapp packaged into $jarFile" }
|
||||
cordappDir
|
||||
|
Loading…
x
Reference in New Issue
Block a user