Merge remote-tracking branch 'open/master' into shams-os-merge-080118

# Conflicts:
#	node/src/integration-test/kotlin/net/corda/node/services/AttachmentLoadingTests.kt
#	node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt
#	node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt
This commit is contained in:
Shams Asari 2018-01-08 13:43:10 +00:00
commit a94f99781a
22 changed files with 1303 additions and 386 deletions

View File

@ -13,8 +13,22 @@ import javax.xml.bind.DatatypeConverter
*/
@CordaSerializable
sealed class ByteSequence : Comparable<ByteSequence> {
constructor() {
this._bytes = COPY_BYTES
}
/**
* The underlying bytes.
* This constructor allows to bypass calls to [bytes] for functions in this class if the implementation
* of [bytes] makes a copy of the underlying [ByteArray] (as [OpaqueBytes] does for safety). This improves
* performance. It is recommended to use this constructor rather than the default constructor.
*/
constructor(uncopiedBytes: ByteArray) {
this._bytes = uncopiedBytes
}
/**
* The underlying bytes. Some implementations may choose to make a copy of the underlying [ByteArray] for
* security reasons. For example, [OpaqueBytes].
*/
abstract val bytes: ByteArray
/**
@ -26,8 +40,11 @@ sealed class ByteSequence : Comparable<ByteSequence> {
*/
abstract val offset: Int
private val _bytes: ByteArray
get() = if (field === COPY_BYTES) bytes else field
/** Returns a [ByteArrayInputStream] of the bytes */
fun open() = ByteArrayInputStream(bytes, offset, size)
fun open() = ByteArrayInputStream(_bytes, offset, size)
/**
* Create a sub-sequence backed by the same array.
@ -38,19 +55,22 @@ sealed class ByteSequence : Comparable<ByteSequence> {
fun subSequence(offset: Int, size: Int): ByteSequence {
require(offset >= 0)
require(offset + size <= this.size)
// Intentionally use bytes rather than _bytes, to mirror the copy-or-not behaviour of that property.
return if (offset == 0 && size == this.size) this else of(bytes, this.offset + offset, size)
}
companion object {
/**
* Construct a [ByteSequence] given a [ByteArray] and optional offset and size, that represents that potentially
* sub-sequence of bytes. The returned implementation is optimised when the whole [ByteArray] is the sequence.
* sub-sequence of bytes.
*/
@JvmStatic
@JvmOverloads
fun of(bytes: ByteArray, offset: Int = 0, size: Int = bytes.size): ByteSequence {
return if (offset == 0 && size == bytes.size && size != 0) OpaqueBytes(bytes) else OpaqueBytesSubSequence(bytes, offset, size)
return OpaqueBytesSubSequence(bytes, offset, size)
}
private val COPY_BYTES: ByteArray = ByteArray(0)
}
/**
@ -65,7 +85,7 @@ sealed class ByteSequence : Comparable<ByteSequence> {
* Copy this sequence, complete with new backing array. This can be helpful to break references to potentially
* large backing arrays from small sub-sequences.
*/
fun copy(): ByteSequence = of(bytes.copyOfRange(offset, offset + size))
fun copy(): ByteSequence = of(_bytes.copyOfRange(offset, offset + size))
/**
* Compare byte arrays byte by byte. Arrays that are shorter are deemed less than longer arrays if all the bytes
@ -73,10 +93,12 @@ sealed class ByteSequence : Comparable<ByteSequence> {
*/
override fun compareTo(other: ByteSequence): Int {
val min = minOf(this.size, other.size)
val thisBytes = this._bytes
val otherBytes = other._bytes
// Compare min bytes
for (index in 0 until min) {
val unsignedThis = java.lang.Byte.toUnsignedInt(this.bytes[this.offset + index])
val unsignedOther = java.lang.Byte.toUnsignedInt(other.bytes[other.offset + index])
val unsignedThis = java.lang.Byte.toUnsignedInt(thisBytes[this.offset + index])
val unsignedOther = java.lang.Byte.toUnsignedInt(otherBytes[other.offset + index])
if (unsignedThis != unsignedOther) {
return Integer.signum(unsignedThis - unsignedOther)
}
@ -89,7 +111,7 @@ sealed class ByteSequence : Comparable<ByteSequence> {
if (this === other) return true
if (other !is ByteSequence) return false
if (this.size != other.size) return false
return subArraysEqual(this.bytes, this.offset, this.size, other.bytes, other.offset)
return subArraysEqual(this._bytes, this.offset, this.size, other._bytes, other.offset)
}
private fun subArraysEqual(a: ByteArray, aOffset: Int, length: Int, b: ByteArray, bOffset: Int): Boolean {
@ -103,14 +125,15 @@ sealed class ByteSequence : Comparable<ByteSequence> {
}
override fun hashCode(): Int {
val thisBytes = _bytes
var result = 1
for (index in offset until (offset + size)) {
result = 31 * result + bytes[index]
result = 31 * result + thisBytes[index]
}
return result
}
override fun toString(): String = "[${bytes.copyOfRange(offset, offset + size).toHexString()}]"
override fun toString(): String = "[${_bytes.copyOfRange(offset, offset + size).toHexString()}]"
}
/**
@ -118,7 +141,7 @@ sealed class ByteSequence : Comparable<ByteSequence> {
* In an ideal JVM this would be a value type and be completely overhead free. Project Valhalla is adding such
* functionality to Java, but it won't arrive for a few years yet!
*/
open class OpaqueBytes(bytes: ByteArray) : ByteSequence() {
open class OpaqueBytes(bytes: ByteArray) : ByteSequence(bytes) {
companion object {
/**
* Create [OpaqueBytes] from a sequence of [Byte] values.
@ -147,7 +170,7 @@ open class OpaqueBytes(bytes: ByteArray) : ByteSequence() {
}
/**
* Copy [size] bytes from this [ByteArray] starting from [offset] into a new [ByteArray].
* Wrap [size] bytes from this [ByteArray] starting from [offset] into a new [ByteArray].
*/
fun ByteArray.sequence(offset: Int = 0, size: Int = this.size) = ByteSequence.of(this, offset, size)
@ -165,7 +188,7 @@ fun String.parseAsHex(): ByteArray = DatatypeConverter.parseHexBinary(this)
/**
* Class is public for serialization purposes
*/
class OpaqueBytesSubSequence(override val bytes: ByteArray, override val offset: Int, override val size: Int) : ByteSequence() {
class OpaqueBytesSubSequence(override val bytes: ByteArray, override val offset: Int, override val size: Int) : ByteSequence(bytes) {
init {
require(offset >= 0 && offset < bytes.size)
require(size >= 0 && size <= bytes.size)

View File

@ -0,0 +1,237 @@
Default Class Evolution
=======================
.. contents::
Whilst more complex evolutionary modifications to classes require annotating, Corda's serialization
framework supports several minor modifications to classes without any external modification save
the actual code changes. These are:
#. Adding nullable properties
#. Adding non nullable properties if, and only if, an annotated constructor is provided
#. Removing properties
#. Reordering constructor parameters
Adding Nullable Properties
--------------------------
The serialization framework allows nullable properties to be freely added. For example:
.. container:: codeset
.. sourcecode:: kotlin
// Initial instance of the class
data class Example1 (val a: Int, b: String) // (Version A)
// Class post addition of property c
data class Example1 (val a: Int, b: String, c: Int?) // (Version B)
A node with version A of class ``Example1`` will be able to deserialize a blob serialized by a node with it
at version B as the framework would treat it as a removed property.
A node with the class at version B will be able to deserialize a serialized version A of ``Example1`` without
any modification as the property is nullable and will thus provide null to the constructor.
Adding Non Nullable Properties
------------------------------
If a non null property is added, unlike nullable properties, some additional code is required for
this to work. Consider a similar example to our nullable example above
.. container:: codeset
.. sourcecode:: kotlin
// Initial instance of the class
data class Example2 (val a: Int, b: String) // (Version A)
// Class post addition of property c
data class Example1 (val a: Int, b: String, c: Int) { // (Version B)
@DeprecatedConstructorForDeserialization(1)
constructor (a: Int, b: String) : this(a, b, 0) // 0 has been determined as a sensible default
}
For this to work we have had to add a new constructor that allows nodes with the class at version B to create an
instance from serialised form of that class from an older version, in this case version A as per our example
above. A sensible default for the missing value is provided for instantiation of the non null property.
.. note:: The ``@DeprecatedConstructorForDeserialization`` annotation is important, this signifies to the
serialization framework that this constructor should be considered for building instances of the
object when evolution is required.
Furthermore, the integer parameter passed to the constructor if the annotation indicates a precedence
order, see the discussion below.
As before, instances of the class at version A will be able to deserialize serialized forms of example B as it
will simply treat them as if the property has been removed (as from its perspective, they will have been.)
Constructor Versioning
~~~~~~~~~~~~~~~~~~~~~~
If, over time, multiple non nullable properties are added, then a class will potentially have to be able
to deserialize a number of different forms of the class. Being able to select the correct constructor is
important to ensure the maximum information is extracted.
Consider this example:
.. container:: codeset
.. sourcecode:: kotlin
// The original version of the class
data class Example3 (val a: Int, val b: Int)
.. container:: codeset
.. sourcecode:: kotlin
// The first alteration, property c added
data class Example3 (val a: Int, val b: Int, val c: Int)
.. container:: codeset
.. sourcecode:: kotlin
// The second alteration, property d added
data class Example3 (val a: Int, val b: Int, val c: Int, val d: Int)
.. container:: codeset
.. sourcecode:: kotlin
// The third alteration, and how it currently exists, property e added
data class Example3 (val a: Int, val b: Int, val c: Int, val d: Int, val: Int e) {
// NOTE: version number purposefully omitted from annotation for demonstration purposes
@DeprecatedConstructorForDeserialization
constructor (a: Int, b: Int) : this(a, b, -1, -1, -1) // alt constructor 1
@DeprecatedConstructorForDeserialization
constructor (a: Int, b: Int, c: Int) : this(a, b, c, -1, -1) // alt constructor 2
@DeprecatedConstructorForDeserialization
constructor (a: Int, b: Int, c: Int, d) : this(a, b, c, d, -1) // alt constructor 3
}
In this case, the deserializer has to be able to deserialize instances of class ``Example3`` that were serialized as, for example:
.. container:: codeset
.. sourcecode:: kotlin
Example3 (1, 2) // example I
Example3 (1, 2, 3) // example II
Example3 (1, 2, 3, 4) // example III
Example3 (1, 2, 3, 4, 5) // example IV
Examples I, II, and III would require evolution and thus selection of constructor. Now, with no versioning applied there
is ambiguity as to which constructor should be used. For example, example II could use 'alt constructor 2' which matches
it's arguments most tightly or 'alt constructor 1' and not instantiate parameter c.
``constructor (a: Int, b: Int, c: Int) : this(a, b, c, -1, -1)``
or
``constructor (a: Int, b: Int) : this(a, b, -1, -1, -1)``
Whilst it may seem trivial which should be picked, it is still ambiguous, thus we use a versioning number in the constructor
annotation which gives a strict precedence order to constructor selection. Therefore, the proper form of the example would
be:
.. container:: codeset
.. sourcecode:: kotlin
// The third alteration, and how it currently exists, property e added
data class Example3 (val a: Int, val b: Int, val c: Int, val d: Int, val: Int e) {
// NOTE: version number purposefully omitted from annotation for demonstration purposes
@DeprecatedConstructorForDeserialization(1)
constructor (a: Int, b: Int) : this(a, b, -1, -1, -1) // alt constructor 1
@DeprecatedConstructorForDeserialization(2)
constructor (a: Int, b: Int, c: Int) : this(a, b, c, -1, -1) // alt constructor 2
@DeprecatedConstructorForDeserialization(3)
constructor (a: Int, b: Int, c: Int, d) : this(a, b, c, d, -1) // alt constructor 3
}
Constructors are selected in strict descending order taking the one that enables construction. So, deserializing examples I to IV would
give us:
.. container:: codeset
.. sourcecode:: kotlin
Example3 (1, 2, -1, -1, -1) // example I
Example3 (1, 2, 3, -1, -1) // example II
Example3 (1, 2, 3, 4, -1) // example III
Example3 (1, 2, 3, 4, 5) // example IV
Removing Properties
-------------------
Property removal is effectively a mirror of adding properties (both nullable and non nullable) given that this functionality
is required to facilitate the addition of properties. When this state is detected by the serialization framework, properties
that don't have matching parameters in the main constructor are simply omitted from object construction.
.. container:: codeset
.. sourcecode:: kotlin
// Initial instance of the class
data class Example4 (val a: Int?, val b: String?, val c: Int?) // (Version A)
// Class post removal of property 'a'
data class Example4 (val b: String?, c: Int?) // (Version B)
In practice, what this means is removing nullable properties is possible. However, removing non nullable properties isn't because
a node receiving a message containing a serialized form of an object with fewer properties than it requires for construction has
no capacity to guess at what values should or could be used as sensible defaults. When those properties are nullable it simply sets
them to null.
Reordering Constructor Parameter Order
--------------------------------------
Properties (in Kotlin this corresponds to constructor parameters) may be reordered freely. The evolution serializer will create a
mapping between how a class was serialized and its current constructor parameter order. This is important to our AMQP framework as it
constructs objects using their primary (or annotated) constructor. The ordering of whose parameters will have determined the way
an object's properties were serialised into the byte stream.
For an illustrative example consider a simple class:
.. Container:: codeset
.. sourcecode:: kotlin
data class Example5 (val a: Int, val b: String)
val e = Example5(999, "hello")
When we serialize ``e`` its properties will be encoded in order of its primary constructors parameters, so:
``999,hello``
Were those parameters to be reordered post serialisation then deserializing, without evolution, would fail with a basic
type error as we'd attempt to create the new value of ``Example5`` with the values provided in the wrong order:
.. Container:: codeset
.. sourcecode:: kotlin
// changed post serialisation
data class Example5 (val b: String, val a: Int)
.. sourcecode:: shell
| 999 | hello | <--- Extract properties to pass to constructor from byte stream
| |
| +--------------------------+
+--------------------------+ |
| |
deserializedValue = Example5(999, "hello") <--- Resulting attempt at construction
| |
| \
| \ <--- Will clearly fail as 999 is not a
| \ string and hello is not an integer
data class Example5 (val b: String, val a: Int)

View File

@ -0,0 +1,335 @@
Enum Evolution
==============
.. contents::
In the continued development of a CorDapp an enumerated type that was fit for purpose at one time may
require changing. Normally, this would be problematic as anything serialised (and kept in a vault) would
run the risk of being unable to be deserialized in the future or older versions of the app still alive
within a compatibility zone may fail to deserialize a message.
To facilitate backward and forward support for alterations to enumerated types Corda's serialization
framework supports the evolution of such types through a well defined framework that allows different
versions to interoperate with serialised versions of an enumeration of differing versions.
This is achieved through the use of certain annotations. Whenever a change is made, an annotation
capturing the change must be added (whilst it can be omitted any interoperability will be lost). Corda
supports two modifications to enumerated types, adding new constants, and renaming existing constants
.. warning:: Once added evolution annotations MUST NEVER be removed from a class, doing so will break
both forward and backward compatibility for this version of the class and any version moving
forward
The Purpose of Annotating Changes
---------------------------------
The biggest hurdle to allowing enum constants to be changed is that there will exist instances of those
classes, either serialized in a vault or on nodes with the old, unmodified, version of the class that we
must be able to interoperate with. Thus if a received data structure references an enum assigned a constant
value that doesn't exist on the running JVM, a solution is needed.
For this, we use the annotations to allow developers to express their backward compatible intentions.
In the case of renaming constants this is somewhat obvious, the deserializing node will simply treat any
constants it doesn't understand as their "old" values, i.e. those values that it currently knows about.
In the case of adding new constants the developer must chose which constant (that existed *before* adding
the new one) a deserializing system should treat any instances of the new one as.
.. note:: Ultimately, this may mean some design compromises are required. If an enumeration is
planned as being often extended and no sensible defaults will exist then including a constant
in the original version of the class that all new additions can default to may make sense
Evolution Transmission
----------------------
An object serializer, on creation, will inspect the class it represents for any evolution annotations.
If a class is thus decorated those rules will be encoded as part of any serialized representation of a
data structure containing that class. This ensures that on deserialization the deserializing object will
have access to any transformative rules it needs to build a local instance of the serialized object.
Evolution Precedence
--------------------
On deserialization (technically on construction of a serialization object that facilitates serialization
and deserialization) a class's fingerprint is compared to the fingerprint received as part of the AMQP
header of the corresponding class. If they match then we are sure that the two class versions are functionally
the same and no further steps are required save the deserialization of the serialized information into an instance
of the class.
If, however, the fingerprints differ then we know that the class we are attempting to deserialize is different
than the version we will be deserializing it into. What we cannot know is which version is newer, at least
not by examining the fingerprint
.. note:: Corda's AMQP fingerprinting for enumerated types include the type name and the enum constants
Newer vs older is important as the deserializer needs to use the more recent set of transforms to ensure it
can transform the serialised object into the form as it exists in the deserializer. Newness is determined simply
by length of the list of all transforms. This is sufficient as transform annotations should only ever be added
.. warning:: technically there is nothing to prevent annotations being removed in newer versions. However,
this will break backward compatibility and should thus be avoided unless a rigorous upgrade procedure
is in place to cope with all deployed instances of the class and all serialised versions existing
within vaults.
Thus, on deserialization, there will be two options to chose from in terms of transformation rules
#. Determined from the local class and the annotations applied to it (the local copy)
#. Parsed from the AMQP header (the remote copy)
Which set is used will simply be the largest.
Renaming Constants
------------------
Renamed constants are marked as such with the ``@CordaSerializationTransformRenames`` meta annotation that
wraps a list of ``@CordaSerializationTransformRename`` annotations. Each rename requiring an instance in the
list.
Each instance must provide the new name of the constant as well as the old. For example, consider the following enumeration:
.. container:: codeset
.. sourcecode:: kotlin
enum class Example {
A, B, C
}
If we were to rename constant C to D this would be done as follows:
.. container:: codeset
.. sourcecode:: kotlin
@CordaSerializationTransformRenames (
CordaSerializationTransformRename("D", "C")
)
enum class Example {
A, B, D
}
.. note:: The parameters to the ``CordaSerializationTransformRename`` annotation are defined as 'to' and 'from,
so in the above example it can be read as constant D (given that is how the class now exists) was renamed
from C
In the case where a single rename has been applied the meta annotation may be omitted. Thus, the following is
functionally identical to the above:
.. container:: codeset
.. sourcecode:: kotlin
@CordaSerializationTransformRename("D", "C")
enum class Example {
A, B, D
}
However, as soon as a second rename is made the meta annotation must be used. For example, if at some time later
B is renamed to E:
.. container:: codeset
.. sourcecode:: kotlin
@CordaSerializationTransformRenames (
CordaSerializationTransformRename(from = "B", to = "E"),
CordaSerializationTransformRename(from = "C", to = "D")
)
enum class Example {
A, E, D
}
Rules
~~~~~
#. A constant cannot be renamed to match an existing constant, this is enforced through language constraints
#. A constant cannot be renamed to a value that matches any previous name of any other constant
If either of these covenants are inadvertently broken, a ``NotSerializableException`` will be thrown on detection
by the serialization engine as soon as they are detected. Normally this will be the first time an object doing
so is serialized. However, in some circumstances, it could be at the point of deserialization.
Adding Constants
----------------
Enumeration constants can be added with the ``@CordaSerializationTransformEnumDefaults`` meta annotation that
wraps a list of ``CordaSerializationTransformEnumDefault`` annotations. For each constant added an annotation
must be included that signifies, on deserialization, which constant value should be used in place of the
serialised property if that value doesn't exist on the version of the class as it exists on the deserializing
node.
.. container:: codeset
.. sourcecode:: kotlin
enum class Example {
A, B, C
}
If we were to add the constant D
.. container:: codeset
.. sourcecode:: kotlin
@CordaSerializationTransformEnumDefaults (
CordaSerializationTransformEnumDefault("D", "C")
)
enum class Example {
A, B, C, D
}
.. note:: The parameters to the ``CordaSerializationTransformEnumDefault`` annotation are defined as 'new' and 'old',
so in the above example it can be read as constant D should be treated as constant C if you, the deserializing
node, don't know anything about constant D
.. note:: Just as with the ``CordaSerializationTransformRename`` transformation if a single transform is being applied
then the meta transform may be omitted.
.. container:: codeset
.. sourcecode:: kotlin
@CordaSerializationTransformEnumDefault("D", "C")
enum class Example {
A, B, C, D
}
New constants may default to any other constant older than them, including constants that have also been added
since inception. In this example, having added D (above) we add the constant E and chose to default it to D
.. container:: codeset
.. sourcecode:: kotlin
@CordaSerializationTransformEnumDefaults (
CordaSerializationTransformEnumDefault("E", "D"),
CordaSerializationTransformEnumDefault("D", "C")
)
enum class Example {
A, B, C, D, E
}
.. note:: Alternatively, we could have decided both new constants should have been defaulted to the first
element
.. sourcecode:: kotlin
@CordaSerializationTransformEnumDefaults (
CordaSerializationTransformEnumDefault("E", "A"),
CordaSerializationTransformEnumDefault("D", "A")
)
enum class Example {
A, B, C, D, E
}
When deserializing the most applicable transform will be applied. Continuing the above example, deserializing
nodes could have three distinct views on what the enum Example looks like (annotations omitted for brevity)
.. container:: codeset
.. sourcecode:: kotlin
// The original version of the class. Will deserialize: -
// A -> A
// B -> B
// C -> C
// D -> C
// E -> C
enum class Example {
A, B, C
}
.. sourcecode:: kotlin
// The class as it existed after the first addition. Will deserialize:
// A -> A
// B -> B
// C -> C
// D -> D
// E -> D
enum class Example {
A, B, C, D
}
.. sourcecode:: kotlin
// The current state of the class. All values will deserialize as themselves
enum class Example {
A, B, C, D, E
}
Thus, when deserializing a value that has been encoded as E could be set to one of three constants (E, D, and C)
depending on how the deserializing node understands the class.
Rules
~~~~~
#. New constants must be added to the end of the existing list of constants
#. Defaults can only be set to "older" constants, i.e. those to the left of the new constant in the list
#. Constants must never be removed once added
#. New constants can be renamed at a later date using the appropriate annotation
#. When renamed, if a defaulting annotation refers to the old name, it should be left as is
Combining Evolutions
---------------------
Renaming constants and adding constants can be combined over time as a class changes freely. Added constants can
in turn be renamed and everything will continue to be deserializeable. For example, consider the following enum:
.. container:: codeset
.. sourcecode:: kotlin
enum class OngoingExample { A, B, C }
For the first evolution, two constants are added, D and E, both of which are set to default to C when not present
.. container:: codeset
.. sourcecode:: kotlin
@CordaSerializationTransformEnumDefaults (
CordaSerializationTransformEnumDefault("E", "C"),
CordaSerializationTransformEnumDefault("D", "C")
)
enum class OngoingExample { A, B, C, D, E }
Then lets assume constant C is renamed to CAT
.. container:: codeset
.. sourcecode:: kotlin
@CordaSerializationTransformEnumDefaults (
CordaSerializationTransformEnumDefault("E", "C"),
CordaSerializationTransformEnumDefault("D", "C")
)
@CordaSerializationTransformRename("C", "CAT")
enum class OngoingExample { A, B, CAT, D, E }
Note how the first set of modifications still reference C, not CAT. This is as it should be and will
continue to work as expected.
Subsequently is is fine to add an additional new constant that references the renamed value.
.. container:: codeset
.. sourcecode:: kotlin
@CordaSerializationTransformEnumDefaults (
CordaSerializationTransformEnumDefault("F", "CAT"),
CordaSerializationTransformEnumDefault("E", "C"),
CordaSerializationTransformEnumDefault("D", "C")
)
@CordaSerializationTransformRename("C", "CAT")
enum class OngoingExample { A, B, CAT, D, E, F }
Unsupported Evolutions
----------------------
The following evolutions are not currently supports
#. Removing constants
#. Reordering constants

View File

@ -47,13 +47,12 @@ It's reproduced here as an example of both ways you can do this for a couple of
AMQP
====
.. note:: AMQP serialization is not currently live and will be turned on in a future release.
The long term goal is to migrate the current serialization format for everything except checkpoints away from the current
``Kryo``-based format to a more sustainable, self-describing and controllable format based on AMQP 1.0. The primary drivers for that move are:
Originally Corda used a ``Kryo``-based serialization scheme throughout for all serialization contexts. However, it was realised there
was a compelling use case for the definition and development of a custom format based upon AMQP 1.0. The primary drivers for this were
#. A desire to have a schema describing what has been serialized along-side the actual data:
#. To assist with versioning, both in terms of being able to interpret long ago archived data (e.g. trades from
#. To assist with versioning, both in terms of being able to interpret long ago archivEd data (e.g. trades from
a decade ago, long after the code has changed) and between differing code versions.
#. To make it easier to write user interfaces that can navigate the serialized form of data.
#. To support cross platform (non-JVM) interaction, where the format of a class file is not so easily interpreted.
@ -65,7 +64,24 @@ The long term goal is to migrate the current serialization format for everything
data poked directly into their fields without an opportunity to validate consistency or intercept attempts to manipulate
supposed invariants.
Documentation on that format, and how JVM classes are translated to AMQP, will be linked here when it is available.
Delivering this is an ongoing effort by the Corda development team. At present, the ``Kryo``-based format is still used by the RPC framework on
both the client and server side. However, it is planned that this will move to the AMQP framework when ready.
The AMQP framework is currently used for:
#. The peer to peer context, representing inter-node communication.
#. The persistence layer, representing contract states persisted into the vault.
Finally, for the checkpointing of flows Corda will continue to use the existing ``Kryo`` scheme.
This separation of serialization schemes into different contexts allows us to use the most suitable framework for that context rather than
attempting to force a one size fits all approach. Where ``Kryo`` is more suited to the serialization of a programs stack frames, being more flexible
than our AMQP framework in what it can construct and serialize, that flexibility makes it exceptionally difficult to make secure. Conversly
our AMQP framework allows us to concentrate on a robust a secure framework that can be reasoned about thus made safer with far fewer unforeseen
security holes.
.. note:: Selection of serialization context should, for the most part, be opaque to CorDapp developers, the Corda framework selecting
the correct context as confugred.
.. For information on our choice of AMQP 1.0, see :doc:`amqp-choice`. For detail on how we utilise AMQP 1.0 and represent
objects in AMQP types, see :doc:`amqp-format`.
@ -210,36 +226,115 @@ Here are the rules to adhere to for support of your own types:
Classes
```````
General Rules
'''''''''''''
#. The class must be compiled with parameter names included in the ``.class`` file. This is the default in Kotlin
but must be turned on in Java (``-parameters`` command line option to ``javac``).
#. The class is annotated with ``@CordaSerializable``.
#. The declared types of constructor arguments, getters, and setters must be supported, and where generics are used the
generic parameter must be a supported type, an open wildcard (``*``), or a bounded wildcard which is currently
widened to an open wildcard.
#. Any superclass must adhere to the same rules, but can be abstract.
#. Object graph cycles are not supported, so an object cannot refer to itself, directly or indirectly.
Constructor Instantiation
'''''''''''''''''''''''''
The primary way the AMQP serialization framework for Corda instantiates objects is via a defined constructor. This is
used to first determine which properties of an object are to be serialised then, on deserialization, it is used to
instantiate the object with the serialized values.
This is the recommended design idiom for serializable objects in Corda as it allows for immutable state objects to
be created
#. A Java Bean getter for each of the properties in the constructor, with the names matching up. For example, for a constructor
parameter ``foo``, there must be a getter called ``getFoo()``. If the type of ``foo`` is boolean, the getter may
optionally be called ``isFoo()``. This is why the class must be compiled with parameter names turned on.
#. A constructor which takes all of the properties that you wish to record in the serialized form. This is required in
order for the serialization framework to reconstruct an instance of your class.
#. If more than one constructor is provided, the serialization framework needs to know which one to use. The ``@ConstructorForDeserialization``
annotation can be used to indicate which one. For a Kotlin class, without the ``@ConstructorForDeserialization`` annotation, the
*primary constructor* will be selected.
#. The class must be compiled with parameter names included in the ``.class`` file. This is the default in Kotlin
but must be turned on in Java (``-parameters`` command line option to ``javac``).
#. A Java Bean getter for each of the properties in the constructor, with the names matching up. For example, for a constructor
parameter ``foo``, there must be a getter called ``getFoo()``. If the type of ``foo`` is boolean, the getter may
optionally be called ``isFoo()``. This is why the class must be compiled with parameter names turned on.
#. The class is annotated with ``@CordaSerializable``.
#. The declared types of constructor arguments / getters must be supported, and where generics are used the
generic parameter must be a supported type, an open wildcard (``*``), or a bounded wildcard which is currently
widened to an open wildcard.
#. Any superclass must adhere to the same rules, but can be abstract.
#. Object graph cycles are not supported, so an object cannot refer to itself, directly or indirectly.
In Kotlin, this maps cleanly to a data class where there getters are synthesized automatically. For example,
.. container:: codeset
.. sourcecode:: kotlin
data class Example (val a: Int, val b: String)
Both properties a and b will be included in the serialised form. However, as stated above, properties not mentioned in
the constructor will not be serialised. For example, in the following code property c will not be considered part of the
serialised form
.. container:: codeset
.. sourcecode:: kotlin
data class Example (val a: Int, val b: String) {
var c: Int = 20
}
var e = Example (10, "hello")
e.c = 100;
val e2 = e.serialize().deserialize() // e2.c will be 20, not 100!!!
.. warning:: Private properties in Kotlin classes render the class unserializable *unless* a public
getter is manually defined. For example:
.. container:: codeset
.. sourcecode:: kotlin
data class C(val a: Int, private val b: Int) {
// Without this C cannot be serialized
public fun getB() = b
}
.. note:: This is particularly relevant as IDE's can often point out where they believe a
property can be made private without knowing this can break Corda serialization. Should
this happen then a run time warning will be generated when the class fails to serialize
Setter Instantiation
''''''''''''''''''''
As an alternative to constructor based initialisation Corda can also determine the important elements of an
object by inspecting the getter and setter methods present on a class. If a class has **only** a default
constructor **and** properties then the serializable properties will be determined by the presence of
both a getter and setter for that property that are both publicly visible. I.e. the class adheres to
the classic *idiom* of mutable JavaBeans.
On deserialization, a default instance will first be created and then, in turn, the setters invoked
on that object to populate the correct values.
For example:
.. container:: codeset
.. sourcecode:: Java
class Example {
private int a;
private int b;
private int c;
public int getA() { return a; }
public int getB() { return b; }
public int getC() { return c; }
public void setA(int a) { this.a = a; }
public void setB(int b) { this.b = b; }
public void setC(int c) { this.c = c; }
}
Enums
`````
#. All enums are supported, provided they are annotated with ``@CordaSerializable``.
.. warning:: Use of enums in CorDapps requires potentially deeper consideration than in other application environments
due to the challenges of simultaneously upgrading the code on all nodes. It is therefore important to consider the code
evolution perspective, since an older version of the enum code cannot
accommodate a newly added element of the enum in a new version of the enum code. See `Type Evolution`_. Hence, enums are
a good fit for genuinely static data that will *never* change. e.g. Days of the week is not going to be extended any time
soon and is indeed an enum in the Java library. A Buy or Sell indicator is another. However, something like
Trade Type or Currency Code is likely not, since who's to say a new trade type or currency will not come along soon. For
those it is better to choose another representation: perhaps just a string.
Exceptions
``````````
@ -276,16 +371,22 @@ Future Enhancements
static method responsible for returning the singleton instance.
#. Instance internalizing support. We will add support for identifying classes that should be resolved against an instances map to avoid
creating many duplicate instances that are equal. Similar to ``String.intern()``.
#. Enum evolution support. We *may* introduce an annotation that can be applied to an enum element to indicate that
if an unrecognised enum entry is deserialized from a newer version of the code, it should be converted to that
element in the older version of the code. This is dependent on identifying a suitable use case, since it does
mutate the data when transported to another node, which could be considered hazardous.
.. Type Evolution:
Type Evolution
--------------
When we move to AMQP as the serialization format, we will be adding explicit support for interoperability of different versions of the same code.
We will describe here the rules and features for evolvability as part of a future update to the documentation.
Type evolution is the mechanisms by which classes can be altered over time yet still remain serializable and deserializable across
all versions of the class. This ensures an object serialized with an older idea of what the class "looked like" can be deserialized
and a version of the current state of the class instantiated.
More detail can be found in :doc:`serialization-default-evolution`
Enum Evolution
``````````````
Corda supports interoperability of enumerated type versions. This allows such types to be changed over time without breaking
backward (or forward) compatibility. The rules and mechanisms for doing this are discussed in :doc:`serialization-enum-evolution``

View File

@ -28,7 +28,7 @@ class Node(private val project: Project) : CordformNode() {
* @note Type is any due to gradle's use of "GStrings" - each value will have "toString" called on it
*/
var cordapps = mutableListOf<Any>()
var additionalCordapps = mutableListOf<File>()
internal var additionalCordapps = mutableListOf<File>()
internal lateinit var nodeDir: File
private set
internal lateinit var rootDir: File

View File

@ -70,7 +70,7 @@ class CorDappCustomSerializer(
data.withDescribed(descriptor) {
data.withList {
for (property in proxySerializer.propertySerializers) {
for (property in proxySerializer.propertySerializers.getters) {
property.writeProperty(proxy, this, output)
}
}

View File

@ -132,7 +132,7 @@ abstract class CustomSerializer<T : Any> : AMQPSerializer<T>, SerializerFor {
override fun writeDescribedObject(obj: T, data: Data, type: Type, output: SerializationOutput) {
val proxy = toProxy(obj)
data.withList {
for (property in proxySerializer.propertySerializers) {
for (property in proxySerializer.propertySerializers.getters) {
property.writeProperty(proxy, this, output)
}
}

View File

@ -20,7 +20,7 @@ class EvolutionSerializer(
override val kotlinConstructor: KFunction<Any>?) : ObjectSerializer(clazz, factory) {
// explicitly set as empty to indicate it's unused by this type of serializer
override val propertySerializers: Collection<PropertySerializer> = emptyList()
override val propertySerializers = ConstructorDestructorMethods (emptyList(), emptyList())
/**
* Represents a parameter as would be passed to the constructor of the class as it was

View File

@ -2,6 +2,7 @@ package net.corda.nodeapi.internal.serialization.amqp
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory.Companion.nameForType
import org.apache.qpid.proton.amqp.Symbol
import org.apache.qpid.proton.codec.Data
@ -21,7 +22,7 @@ open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPS
private val logger = contextLogger()
}
open internal val propertySerializers: Collection<PropertySerializer> by lazy {
open internal val propertySerializers: ConstructorDestructorMethods by lazy {
propertiesForSerialization(kotlinConstructor, clazz, factory)
}
@ -37,7 +38,7 @@ open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPS
for (iface in interfaces) {
output.requireSerializer(iface)
}
for (property in propertySerializers) {
for (property in propertySerializers.getters) {
property.writeClassInfo(output)
}
}
@ -48,7 +49,7 @@ open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPS
data.withDescribed(typeNotation.descriptor) {
// Write list
withList {
for (property in propertySerializers) {
for (property in propertySerializers.getters) {
property.writeProperty(obj, this, output)
}
}
@ -60,23 +61,59 @@ open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPS
schemas: SerializationSchemas,
input: DeserializationInput): Any = ifThrowsAppend({ clazz.typeName }) {
if (obj is List<*>) {
if (obj.size > propertySerializers.size) {
if (obj.size > propertySerializers.getters.size) {
throw NotSerializableException("Too many properties in described type $typeName")
}
val params = obj.zip(propertySerializers).map { it.second.readProperty(it.first, schemas, input) }
construct(params)
return if (propertySerializers.setters.isEmpty()) {
readObjectBuildViaConstructor(obj, schemas, input)
} else {
readObjectBuildViaSetters(obj, schemas, input)
}
} else throw NotSerializableException("Body of described type is unexpected $obj")
}
private fun readObjectBuildViaConstructor(
obj: List<*>,
schemas: SerializationSchemas,
input: DeserializationInput) : Any = ifThrowsAppend({ clazz.typeName }){
logger.trace { "Calling construction based construction for ${clazz.typeName}" }
return construct(obj.zip(propertySerializers.getters).map { it.second.readProperty(it.first, schemas, input) })
}
private fun readObjectBuildViaSetters(
obj: List<*>,
schemas: SerializationSchemas,
input: DeserializationInput) : Any = ifThrowsAppend({ clazz.typeName }){
logger.trace { "Calling setter based construction for ${clazz.typeName}" }
val instance : Any = javaConstructor?.newInstance() ?: throw NotSerializableException (
"Failed to instantiate instance of object $clazz")
// read the properties out of the serialised form
val propertiesFromBlob = obj
.zip(propertySerializers.getters)
.map { it.second.readProperty(it.first, schemas, input) }
// one by one take a property and invoke the setter on the class
propertySerializers.setters.zip(propertiesFromBlob).forEach {
it.first?.invoke(instance, *listOf(it.second).toTypedArray())
}
return instance
}
private fun generateFields(): List<Field> {
return propertySerializers.map { Field(it.name, it.type, it.requires, it.default, null, it.mandatory, false) }
return propertySerializers.getters.map {
Field(it.name, it.type, it.requires, it.default, null, it.mandatory, false)
}
}
private fun generateProvides(): List<String> = interfaces.map { nameForType(it) }
fun construct(properties: List<Any?>): Any {
logger.debug { "Calling constructor: '$javaConstructor' with properties '$properties'" }
logger.trace { "Calling constructor: '$javaConstructor' with properties '$properties'" }
return javaConstructor?.newInstance(*properties.toTypedArray()) ?:
throw NotSerializableException("Attempt to deserialize an interface: $clazz. Serialized form is invalid.")

View File

@ -419,8 +419,11 @@ private fun isCollectionOrMap(type: Class<*>) = (Collection::class.java.isAssign
private fun fingerprintForObject(type: Type, contextType: Type?, alreadySeen: MutableSet<Type>, hasher: Hasher, factory: SerializerFactory): Hasher {
// Hash the class + properties + interfaces
val name = type.asClass()?.name ?: throw NotSerializableException("Expected only Class or ParameterizedType but found $type")
propertiesForSerialization(constructorForDeserialization(type), contextType ?: type, factory).fold(hasher.putUnencodedChars(name)) { orig, prop ->
fingerprintForType(prop.resolvedType, type, alreadySeen, orig, factory).putUnencodedChars(prop.name).putUnencodedChars(if (prop.mandatory) NOT_NULLABLE_HASH else NULLABLE_HASH)
propertiesForSerialization(constructorForDeserialization(type), contextType ?: type, factory).getters
.fold(hasher.putUnencodedChars(name)) { orig, prop ->
fingerprintForType(prop.resolvedType, type, alreadySeen, orig, factory)
.putUnencodedChars(prop.name)
.putUnencodedChars(if (prop.mandatory) NOT_NULLABLE_HASH else NULLABLE_HASH)
}
interfacesForSerialization(type, factory).map { fingerprintForType(it, type, alreadySeen, hasher, factory) }
return hasher

View File

@ -2,12 +2,14 @@ package net.corda.nodeapi.internal.serialization.amqp
import com.google.common.primitives.Primitives
import com.google.common.reflect.TypeToken
import io.netty.util.internal.EmptyArrays
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationContext
import org.apache.qpid.proton.codec.Data
import java.beans.IndexedPropertyDescriptor
import java.beans.Introspector
import java.beans.PropertyDescriptor
import java.io.NotSerializableException
import java.lang.reflect.*
import java.util.*
@ -26,6 +28,10 @@ import kotlin.reflect.jvm.javaType
@Retention(AnnotationRetention.RUNTIME)
annotation class ConstructorForDeserialization
data class ConstructorDestructorMethods(
val getters : Collection<PropertySerializer>,
val setters : Collection<Method?>)
/**
* Code for finding the constructor we will use for deserialization.
*
@ -67,18 +73,30 @@ internal fun constructorForDeserialization(type: Type): KFunction<Any>? {
* Note, you will need any Java classes to be compiled with the `-parameters` option to ensure constructor parameters have
* names accessible via reflection.
*/
internal fun <T : Any> propertiesForSerialization(kotlinConstructor: KFunction<T>?, type: Type, factory: SerializerFactory): Collection<PropertySerializer> {
internal fun <T : Any> propertiesForSerialization(kotlinConstructor: KFunction<T>?, type: Type, factory: SerializerFactory): ConstructorDestructorMethods {
val clazz = type.asClass()!!
return if (kotlinConstructor != null) propertiesForSerializationFromConstructor(kotlinConstructor, type, factory) else propertiesForSerializationFromAbstract(clazz, type, factory)
}
fun isConcrete(clazz: Class<*>): Boolean = !(clazz.isInterface || Modifier.isAbstract(clazz.modifiers))
private fun <T : Any> propertiesForSerializationFromConstructor(kotlinConstructor: KFunction<T>, type: Type, factory: SerializerFactory): Collection<PropertySerializer> {
private fun <T : Any> propertiesForSerializationFromConstructor(
kotlinConstructor: KFunction<T>,
type: Type,
factory: SerializerFactory): ConstructorDestructorMethods {
val clazz = (kotlinConstructor.returnType.classifier as KClass<*>).javaObjectType
// Kotlin reflection doesn't work with Java getters the way you might expect, so we drop back to good ol' beans.
val properties = Introspector.getBeanInfo(clazz).propertyDescriptors.filter { it.name != "class" }.groupBy { it.name }.mapValues { it.value[0] }
val properties = Introspector.getBeanInfo(clazz).propertyDescriptors
.filter { it.name != "class" }
.groupBy { it.name }
.mapValues { it.value[0] }
if (properties.isNotEmpty() && kotlinConstructor.parameters.isEmpty()) {
return propertiesForSerializationFromSetters(properties, type, factory)
}
val rc: MutableList<PropertySerializer> = ArrayList(kotlinConstructor.parameters.size)
for (param in kotlinConstructor.parameters) {
val name = param.name ?: throw NotSerializableException("Constructor parameter of $clazz has no name.")
@ -103,7 +121,37 @@ private fun <T : Any> propertiesForSerializationFromConstructor(kotlinConstructo
throw NotSerializableException("Property type $returnType for $name of $clazz differs from constructor parameter type ${param.type.javaType}")
}
}
return rc
return ConstructorDestructorMethods(rc, emptyList())
}
/**
* If we determine a class has a constructor that takes no parameters then check for pairs of getters / setters
* and use those
*/
private fun propertiesForSerializationFromSetters(
properties : Map<String, PropertyDescriptor>,
type: Type,
factory: SerializerFactory): ConstructorDestructorMethods {
val getters : MutableList<PropertySerializer> = ArrayList(properties.size)
val setters : MutableList<Method?> = ArrayList(properties.size)
properties.forEach { property ->
val getter: Method? = property.value.readMethod
val setter: Method? = property.value.writeMethod
if (getter == null || setter == null) return@forEach
// NOTE: There is no need to check return and parameter types vs the underlying type for
// the getter / setter vs property as if there is a difference then that property isn't reported
// by the BEAN inspector and thus we don't consider that case here
getters += PropertySerializer.make(property.key, getter, resolveTypeVariables(getter.genericReturnType, type),
factory)
setters += setter
}
return ConstructorDestructorMethods(getters, setters)
}
private fun constructorParamTakesReturnTypeOfGetter(getterReturnType: Type, rawGetterReturnType: Type, param: KParameter): Boolean {
@ -111,7 +159,7 @@ private fun constructorParamTakesReturnTypeOfGetter(getterReturnType: Type, rawG
return typeToken.isSupertypeOf(getterReturnType) || typeToken.isSupertypeOf(rawGetterReturnType)
}
private fun propertiesForSerializationFromAbstract(clazz: Class<*>, type: Type, factory: SerializerFactory): Collection<PropertySerializer> {
private fun propertiesForSerializationFromAbstract(clazz: Class<*>, type: Type, factory: SerializerFactory): ConstructorDestructorMethods {
// Kotlin reflection doesn't work with Java getters the way you might expect, so we drop back to good ol' beans.
val properties = Introspector.getBeanInfo(clazz).propertyDescriptors.filter { it.name != "class" }.sortedBy { it.name }.filterNot { it is IndexedPropertyDescriptor }
val rc: MutableList<PropertySerializer> = ArrayList(properties.size)
@ -121,7 +169,7 @@ private fun propertiesForSerializationFromAbstract(clazz: Class<*>, type: Type,
val returnType = resolveTypeVariables(getter.genericReturnType, type)
rc += PropertySerializer.make(property.name, getter, returnType, factory)
}
return rc
return ConstructorDestructorMethods(rc, emptyList())
}
internal fun interfacesForSerialization(type: Type, serializerFactory: SerializerFactory): List<Type> {

View File

@ -8,6 +8,10 @@ import java.nio.ByteBuffer
import java.util.*
import kotlin.collections.LinkedHashSet
data class BytesAndSchemas<T : Any>(
val obj: SerializedBytes<T>,
val schema: Schema,
val transformsSchema: TransformsSchema)
/**
* Main entry point for serializing an object to AMQP.
@ -35,6 +39,18 @@ open class SerializationOutput(internal val serializerFactory: SerializerFactory
}
}
@Throws(NotSerializableException::class)
fun <T : Any> serializeAndReturnSchema(obj: T): BytesAndSchemas<T> {
try {
val blob = _serialize(obj)
val schema = Schema(schemaHistory.toList())
return BytesAndSchemas(blob, schema, TransformsSchema.build(schema, serializerFactory))
} finally {
andFinally()
}
}
internal fun andFinally() {
objectHistory.clear()
serializerHistory.clear()

View File

@ -14,7 +14,7 @@ class OpaqueBytesSubSequenceSerializer(factory: SerializerFactory) :
CustomSerializer.Proxy<OpaqueBytesSubSequence, OpaqueBytes>(OpaqueBytesSubSequence::class.java, OpaqueBytes::class.java, factory) {
override val additionalSerializers: Iterable<CustomSerializer<out Any>> = emptyList()
override fun toProxy(obj: OpaqueBytesSubSequence): OpaqueBytes = obj.copy() as OpaqueBytes
override fun toProxy(obj: OpaqueBytesSubSequence): OpaqueBytes = OpaqueBytes(obj.copy().bytes)
override fun fromProxy(proxy: OpaqueBytes): OpaqueBytesSubSequence = OpaqueBytesSubSequence(proxy.bytes, proxy.offset, proxy.size)
}

View File

@ -24,7 +24,7 @@ class ThrowableSerializer(factory: SerializerFactory) : CustomSerializer.Proxy<T
try {
val constructor = constructorForDeserialization(obj.javaClass)
val props = propertiesForSerialization(constructor, obj.javaClass, factory)
for (prop in props) {
for (prop in props.getters) {
extraProperties[prop.name] = prop.readMethod!!.invoke(obj)
}
} catch (e: NotSerializableException) {

View File

@ -0,0 +1,295 @@
package net.corda.nodeapi.internal.serialization.amqp;
import net.corda.core.serialization.SerializedBytes;
import net.corda.nodeapi.internal.serialization.AllWhitelist;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import static org.junit.Assert.*;
import java.io.NotSerializableException;
public class SetterConstructorTests {
static class C {
private int a;
private int b;
private int c;
public int getA() { return a; }
public int getB() { return b; }
public int getC() { return c; }
public void setA(int a) { this.a = a; }
public void setB(int b) { this.b = b; }
public void setC(int c) { this.c = c; }
}
static class C2 {
private int a;
private int b;
private int c;
public int getA() { return a; }
public int getB() { return b; }
public int getC() { return c; }
public void setA(int a) { this.a = a; }
public void setB(int b) { this.b = b; }
}
static class C3 {
private int a;
private int b;
private int c;
public int getA() { return a; }
public int getC() { return c; }
public void setA(int a) { this.a = a; }
public void setB(int b) { this.b = b; }
public void setC(int c) { this.c = c; }
}
static class C4 {
private int a;
private int b;
private int c;
public int getA() { return a; }
protected int getB() { return b; }
public int getC() { return c; }
private void setA(int a) { this.a = a; }
public void setB(int b) { this.b = b; }
public void setC(int c) { this.c = c; }
}
static class Inner1 {
private String a;
public Inner1(String a) { this.a = a; }
public String getA() { return this.a; }
}
static class Inner2 {
private Double a;
public Double getA() { return this.a; }
public void setA(Double a) { this.a = a; }
}
static class Outer {
private Inner1 a;
private String b;
private Inner2 c;
public Inner1 getA() { return a; }
public String getB() { return b; }
public Inner2 getC() { return c; }
public void setA(Inner1 a) { this.a = a; }
public void setB(String b) { this.b = b; }
public void setC(Inner2 c) { this.c = c; }
}
static class TypeMismatch {
private Integer a;
public void setA(Integer a) { this.a = a; }
public String getA() { return this.a.toString(); }
}
static class TypeMismatch2 {
private Integer a;
public void setA(String a) { this.a = Integer.parseInt(a); }
public Integer getA() { return this.a; }
}
// despite having no constructor we should still be able to serialise an instance of C
@Test
public void serialiseC() throws NotSerializableException {
SerializerFactory factory1 = new SerializerFactory(AllWhitelist.INSTANCE, ClassLoader.getSystemClassLoader());
SerializationOutput ser = new SerializationOutput(factory1);
C c1 = new C();
c1.setA(1);
c1.setB(2);
c1.setC(3);
Schema schemas = ser.serializeAndReturnSchema(c1).component2();
assertEquals(1, schemas.component1().size());
assertEquals(this.getClass().getName() + "$C", schemas.component1().get(0).getName());
CompositeType ct = (CompositeType) schemas.component1().get(0);
assertEquals(3, ct.getFields().size());
assertEquals("a", ct.getFields().get(0).getName());
assertEquals("b", ct.getFields().get(1).getName());
assertEquals("c", ct.getFields().get(2).getName());
// No setter for c so should only serialise two properties
C2 c2 = new C2();
c2.setA(1);
c2.setB(2);
schemas = ser.serializeAndReturnSchema(c2).component2();
assertEquals(1, schemas.component1().size());
assertEquals(this.getClass().getName() + "$C2", schemas.component1().get(0).getName());
ct = (CompositeType) schemas.component1().get(0);
// With no setter for c we should only serialise a and b into the stream
assertEquals(2, ct.getFields().size());
assertEquals("a", ct.getFields().get(0).getName());
assertEquals("b", ct.getFields().get(1).getName());
// no getter for b so shouldn't serialise it,thus only a and c should apper in the envelope
C3 c3 = new C3();
c3.setA(1);
c3.setB(2);
c3.setC(3);
schemas = ser.serializeAndReturnSchema(c3).component2();
assertEquals(1, schemas.component1().size());
assertEquals(this.getClass().getName() + "$C3", schemas.component1().get(0).getName());
ct = (CompositeType) schemas.component1().get(0);
// With no setter for c we should only serialise a and b into the stream
assertEquals(2, ct.getFields().size());
assertEquals("a", ct.getFields().get(0).getName());
assertEquals("c", ct.getFields().get(1).getName());
C4 c4 = new C4();
c4.setA(1);
c4.setB(2);
c4.setC(3);
schemas = ser.serializeAndReturnSchema(c4).component2();
assertEquals(1, schemas.component1().size());
assertEquals(this.getClass().getName() + "$C4", schemas.component1().get(0).getName());
ct = (CompositeType) schemas.component1().get(0);
// With non public visibility on a setter and getter for a and b, only c should be serialised
assertEquals(1, ct.getFields().size());
assertEquals("c", ct.getFields().get(0).getName());
}
@Test
public void deserialiseC() throws NotSerializableException {
SerializerFactory factory1 = new SerializerFactory(AllWhitelist.INSTANCE, ClassLoader.getSystemClassLoader());
C cPre1 = new C();
int a = 1;
int b = 2;
int c = 3;
cPre1.setA(a);
cPre1.setB(b);
cPre1.setC(c);
SerializedBytes bytes = new SerializationOutput(factory1).serialize(cPre1);
C cPost1 = new DeserializationInput(factory1).deserialize(bytes, C.class);
assertEquals(a, cPost1.a);
assertEquals(b, cPost1.b);
assertEquals(c, cPost1.c);
C2 cPre2 = new C2();
cPre2.setA(1);
cPre2.setB(2);
C2 cPost2 = new DeserializationInput(factory1).deserialize(new SerializationOutput(factory1).serialize(cPre2),
C2.class);
assertEquals(a, cPost2.a);
assertEquals(b, cPost2.b);
// no setter for c means nothing will be serialised and thus it will have the default value of zero
// set
assertEquals(0, cPost2.c);
C3 cPre3 = new C3();
cPre3.setA(1);
cPre3.setB(2);
cPre3.setC(3);
C3 cPost3 = new DeserializationInput(factory1).deserialize(new SerializationOutput(factory1).serialize(cPre3),
C3.class);
assertEquals(a, cPost3.a);
// no getter for b means, as before, it'll have been not set and will thus be defaulted to 0
assertEquals(0, cPost3.b);
assertEquals(c, cPost3.c);
C4 cPre4 = new C4();
cPre4.setA(1);
cPre4.setB(2);
cPre4.setC(3);
C4 cPost4 = new DeserializationInput(factory1).deserialize(new SerializationOutput(factory1).serialize(cPre4),
C4.class);
assertEquals(0, cPost4.a);
assertEquals(0, cPost4.b);
assertEquals(c, cPost4.c);
}
@Test
public void serialiseOuterAndInner() throws NotSerializableException {
SerializerFactory factory1 = new SerializerFactory(AllWhitelist.INSTANCE, ClassLoader.getSystemClassLoader());
Inner1 i1 = new Inner1("Hello");
Inner2 i2 = new Inner2();
i2.setA(10.5);
Outer o = new Outer();
o.setA(i1);
o.setB("World");
o.setC(i2);
Outer post = new DeserializationInput(factory1).deserialize(new SerializationOutput(factory1).serialize(o),
Outer.class);
assertEquals("Hello", post.a.a);
assertEquals("World", post.b);
assertEquals((Double)10.5, post.c.a);
}
@Test
public void typeMistmatch() throws NotSerializableException {
SerializerFactory factory1 = new SerializerFactory(AllWhitelist.INSTANCE, ClassLoader.getSystemClassLoader());
TypeMismatch tm = new TypeMismatch();
tm.setA(10);
assertEquals("10", tm.getA());
TypeMismatch post = new DeserializationInput(factory1).deserialize(new SerializationOutput(factory1).serialize(tm),
TypeMismatch.class);
// because there is a type mismatch in the class, it won't return that info as a BEAN property and thus
// we won't serialise it and thus on deserialization it won't be initialized
Assertions.assertThatThrownBy(() -> post.getA()).isInstanceOf(NullPointerException.class);
}
@Test
public void typeMistmatch2() throws NotSerializableException {
SerializerFactory factory1 = new SerializerFactory(AllWhitelist.INSTANCE, ClassLoader.getSystemClassLoader());
TypeMismatch2 tm = new TypeMismatch2();
tm.setA("10");
assertEquals((Integer)10, tm.getA());
TypeMismatch2 post = new DeserializationInput(factory1).deserialize(new SerializationOutput(factory1).serialize(tm),
TypeMismatch2.class);
// because there is a type mismatch in the class, it won't return that info as a BEAN property and thus
// we won't serialise it and thus on deserialization it won't be initialized
assertEquals(null, post.getA());
}
}

View File

@ -30,20 +30,3 @@ class TestSerializationOutput(
fun testName(): String = Thread.currentThread().stackTrace[2].methodName
data class BytesAndSchemas<T : Any>(
val obj: SerializedBytes<T>,
val schema: Schema,
val transformsSchema: TransformsSchema)
// Extension for the serialize routine that returns the scheme encoded into the
// bytes as well as the bytes for simple testing
@Throws(NotSerializableException::class)
fun <T : Any> SerializationOutput.serializeAndReturnSchema(obj: T): BytesAndSchemas<T> {
try {
val blob = _serialize(obj)
val schema = Schema(schemaHistory.toList())
return BytesAndSchemas(blob, schema, TransformsSchema.build(schema, serializerFactory))
} finally {
andFinally()
}
}

View File

@ -28,6 +28,7 @@ import net.corda.testing.internal.*
import net.corda.testing.services.MockAttachmentStorage
import org.junit.Assert.assertEquals
import org.junit.ClassRule
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import java.net.URLClassLoader
@ -103,6 +104,7 @@ class AttachmentLoadingTests : IntegrationTest() {
assertEquals(expected, actual)
}
@Ignore("Test has undeterministic capacity to hang, ignore till fixed")
@Test
fun `test that attachments retrieved over the network are not used for code`() = withoutTestSerialization {
driver {
@ -115,6 +117,7 @@ class AttachmentLoadingTests : IntegrationTest() {
Unit
}
@Ignore("Test has undeterministic capacity to hang, ignore till fixed")
@Test
fun `tests that if the attachment is loaded on both sides already that a flow can run`() = withoutTestSerialization {
driver {

View File

@ -25,6 +25,7 @@ import net.corda.testing.node.ClusterSpec
import net.corda.testing.node.NotarySpec
import org.assertj.core.api.Assertions.assertThat
import org.junit.ClassRule
import org.junit.Ignore
import org.junit.Test
import rx.Observable
import java.util.*
@ -82,6 +83,7 @@ class DistributedServiceTests : IntegrationTest() {
// TODO Use a dummy distributed service rather than a Raft Notary Service as this test is only about Artemis' ability
// to handle distributed services
@Ignore("Test has undeterministic capacity to hang, ignore till fixed")
@Test
fun `requests are distributed evenly amongst the nodes`() = setup {
// Issue 100 pounds, then pay ourselves 50x2 pounds
@ -110,6 +112,7 @@ class DistributedServiceTests : IntegrationTest() {
}
// TODO This should be in RaftNotaryServiceTests
@Ignore("Test has undeterministic capacity to hang, ignore till fixed")
@Test
fun `cluster survives if a notary is killed`() = setup {
// Issue 100 pounds, then pay ourselves 10x5 pounds

View File

@ -14,6 +14,7 @@ import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.join
import net.corda.core.internal.until
import net.corda.core.node.StateLoader
import net.corda.core.schemas.PersistentStateRef
@ -24,12 +25,11 @@ import net.corda.node.internal.CordaClock
import net.corda.node.internal.MutableClock
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.SchedulerService
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.apache.activemq.artemis.utils.ReusableLatch
import java.time.Clock
import org.slf4j.Logger
import java.time.Instant
import java.util.*
import java.util.concurrent.*
@ -51,23 +51,21 @@ import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture
* is the outcome of the activity in order to schedule another activity. Once we have implemented more persistence
* in the nodes, maybe we can consider multiple activities and whether the activities have been completed or not,
* but that starts to sound a lot like off-ledger state.
*
* @param schedulerTimerExecutor The executor the scheduler blocks on waiting for the clock to advance to the next
* activity. Only replace this for unit testing purposes. This is not the executor the [FlowLogic] is launched on.
*/
@ThreadSafe
class NodeSchedulerService(private val clock: CordaClock,
private val database: CordaPersistence,
private val flowStarter: FlowStarter,
private val stateLoader: StateLoader,
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(),
private val unfinishedSchedules: ReusableLatch = ReusableLatch(),
private val serverThread: AffinityExecutor,
private val flowLogicRefFactory: FlowLogicRefFactory)
private val serverThread: Executor,
private val flowLogicRefFactory: FlowLogicRefFactory,
private val log: Logger = staticLog,
scheduledStates: MutableMap<StateRef, ScheduledStateRef> = createMap())
: SchedulerService, SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
private val staticLog get() = contextLogger()
/**
* Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations
* used in demos or testing. This will substitute a Fiber compatible Future so the current
@ -131,7 +129,7 @@ class NodeSchedulerService(private val clock: CordaClock,
* or [Throwable] is available in the original.
*
* We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context
* switch. There's no need to checkpoint our [Fiber]s as there's no external effect of waiting.
* switch. There's no need to checkpoint our [co.paralleluniverse.fibers.Fiber]s as there's no external effect of waiting.
*/
private fun <T : Any> makeStrandFriendlySettableFuture(future: Future<T>) = QuasarSettableFuture<Boolean>().also { g ->
when (future) {
@ -140,6 +138,9 @@ class NodeSchedulerService(private val clock: CordaClock,
else -> throw IllegalArgumentException("Cannot make future $future Strand friendly.")
}
}
@VisibleForTesting
internal val schedulingAsNextFormat = "Scheduling as next {}"
}
@Entity
@ -152,20 +153,17 @@ class NodeSchedulerService(private val clock: CordaClock,
var scheduledAt: Instant = Instant.now()
)
private class InnerState {
var scheduledStates = createMap()
private class InnerState(var scheduledStates: MutableMap<StateRef, ScheduledStateRef>) {
var scheduledStatesQueue: PriorityQueue<ScheduledStateRef> = PriorityQueue({ a, b -> a.scheduledAt.compareTo(b.scheduledAt) })
var rescheduled: GuavaSettableFuture<Boolean>? = null
}
private val mutex = ThreadBox(InnerState())
private val mutex = ThreadBox(InnerState(scheduledStates))
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
fun start() {
mutex.locked {
scheduledStatesQueue.addAll(scheduledStates.all().map { it.second }.toMutableList())
scheduledStatesQueue.addAll(scheduledStates.values)
rescheduleWakeUp()
}
}
@ -206,6 +204,7 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
/**
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the
* [awaitWithDeadline] used below drops through without running the action. We then create a new
@ -223,7 +222,7 @@ class NodeSchedulerService(private val clock: CordaClock,
}
if (scheduledState != null) {
schedulerTimerExecutor.execute {
log.trace { "Scheduling as next $scheduledState" }
log.trace(schedulingAsNextFormat, scheduledState)
// This will block the scheduler single thread until the scheduled time (returns false) OR
// the Future is cancelled due to rescheduling (returns true).
if (!awaitWithDeadline(clock, scheduledState.scheduledAt, ourRescheduledFuture)) {
@ -236,6 +235,11 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
@VisibleForTesting
internal fun join() {
schedulerTimerExecutor.join()
}
private fun onTimeReached(scheduledState: ScheduledStateRef) {
serverThread.execute {
var flowName: String? = "(unknown)"

View File

@ -1,11 +1,9 @@
package net.corda.node.utilities
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.Uninterruptibles
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.function.Supplier
@ -83,31 +81,4 @@ interface AffinityExecutor : Executor {
} while (!f.get())
}
}
/**
* An executor useful for unit tests: allows the current thread to block until a command arrives from another
* thread, which is then executed. Inbound closures/commands stack up until they are cleared by looping.
*
* @param alwaysQueue If true, executeASAP will never short-circuit and will always queue up.
*/
class Gate(private val alwaysQueue: Boolean = false) : AffinityExecutor {
private val thisThread = Thread.currentThread()
private val commandQ = LinkedBlockingQueue<Runnable>()
override val isOnThread: Boolean
get() = !alwaysQueue && Thread.currentThread() === thisThread
override fun execute(command: Runnable) {
Uninterruptibles.putUninterruptibly(commandQ, command)
}
fun waitAndRun() {
val runnable = Uninterruptibles.takeUninterruptibly(commandQ)
runnable.run()
}
override fun flush() {
throw UnsupportedOperationException()
}
}
}

View File

@ -1,7 +1,6 @@
package net.corda.node.services.events
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.MoreExecutors
import com.nhaarman.mockito_kotlin.*
import net.corda.core.contracts.*
import net.corda.core.crypto.generateKeyPair
@ -9,316 +8,171 @@ import net.corda.core.crypto.newSecureRandom
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRef
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.days
import net.corda.node.internal.FlowStarterImpl
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.cordapp.CordappProviderImpl
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.StateMachineManagerImpl
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.internal.configureDatabase
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.*
import net.corda.testing.contracts.DummyContract
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.*
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.services.MockAttachmentStorage
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.StateLoader
import net.corda.node.services.api.FlowStarter
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.internal.doLookup
import net.corda.testing.node.TestClock
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TestWatcher
import org.junit.runner.Description
import org.slf4j.Logger
import java.time.Clock
import java.time.Instant
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.test.assertTrue
class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
private companion object {
val ALICE_KEY = TestIdentity(ALICE_NAME, 70).keyPair
val DUMMY_IDENTITY_1 = getTestPartyAndCertificate(Party(CordaX500Name("Dummy", "Madrid", "ES"), generateKeyPair().public))
val DUMMY_NOTARY = TestIdentity(DUMMY_NOTARY_NAME, 20).party
val myInfo = NodeInfo(listOf(NetworkHostAndPort("mockHost", 30000)), listOf(DUMMY_IDENTITY_1), 1, serial = 1L)
class NodeSchedulerServiceTest {
private val mark = Instant.now()
private val testClock = TestClock(rigorousMock<Clock>().also {
doReturn(mark).whenever(it).instant()
})
private val database = rigorousMock<CordaPersistence>().also {
doAnswer {
val block: DatabaseTransaction.() -> Any? = uncheckedCast(it.arguments[0])
rigorousMock<DatabaseTransaction>().block()
}.whenever(it).transaction(any())
}
private val flowStarter = rigorousMock<FlowStarter>().also {
doReturn(openFuture<FlowStateMachine<*>>()).whenever(it).startFlow(any<FlowLogic<*>>(), any())
}
private val transactionStates = mutableMapOf<StateRef, TransactionState<*>>()
private val stateLoader = rigorousMock<StateLoader>().also {
doLookup(transactionStates).whenever(it).loadState(any())
}
private val flows = mutableMapOf<FlowLogicRef, FlowLogic<*>>()
private val flowLogicRefFactory = rigorousMock<FlowLogicRefFactory>().also {
doLookup(flows).whenever(it).toFlowLogic(any())
}
private val log = rigorousMock<Logger>().also {
doReturn(false).whenever(it).isTraceEnabled
doNothing().whenever(it).trace(any(), any<Any>())
}
private val scheduler = NodeSchedulerService(
testClock,
database,
flowStarter,
stateLoader,
serverThread = MoreExecutors.directExecutor(),
flowLogicRefFactory = flowLogicRefFactory,
log = log,
scheduledStates = mutableMapOf()).apply { start() }
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
private val flowLogicRefFactory = FlowLogicRefFactoryImpl(FlowLogicRefFactoryImpl::class.java.classLoader)
private val realClock: Clock = Clock.systemUTC()
private val stoppedClock: Clock = Clock.fixed(realClock.instant(), realClock.zone)
private val testClock = TestClock(stoppedClock)
private val schedulerGatedExecutor = AffinityExecutor.Gate(true)
abstract class Services : ServiceHubInternal, TestReference
private lateinit var services: Services
private lateinit var scheduler: NodeSchedulerService
private lateinit var smmExecutor: AffinityExecutor.ServiceAffinityExecutor
private lateinit var database: CordaPersistence
private lateinit var countDown: CountDownLatch
private lateinit var smmHasRemovedAllFlows: CountDownLatch
private lateinit var kms: MockKeyManagementService
private lateinit var mockSMM: StateMachineManager
var calls: Int = 0
/**
* Have a reference to this test added to [ServiceHub] so that when the [FlowLogic] runs it can access the test instance.
* The [TestState] is serialized and deserialized so attempting to use a transient field won't work, as it just
* results in NPE.
*/
interface TestReference {
val testReference: NodeSchedulerServiceTest
}
@Before
fun setup() {
countDown = CountDownLatch(1)
smmHasRemovedAllFlows = CountDownLatch(1)
calls = 0
val dataSourceProps = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProps, DatabaseConfig(), rigorousMock())
val identityService = makeTestIdentityService()
kms = MockKeyManagementService(identityService, ALICE_KEY)
val configuration = rigorousMock<NodeConfiguration>().also {
doReturn(true).whenever(it).devMode
doReturn(null).whenever(it).devModeOptions
}
val validatedTransactions = MockTransactionStorage()
database.transaction {
services = rigorousMock<Services>().also {
doReturn(configuration).whenever(it).configuration
doReturn(MonitoringService(MetricRegistry())).whenever(it).monitoringService
doReturn(validatedTransactions).whenever(it).validatedTransactions
doReturn(rigorousMock<NetworkMapCacheInternal>().also {
doReturn(doneFuture(null)).whenever(it).nodeReady
}).whenever(it).networkMapCache
doReturn(myInfo).whenever(it).myInfo
doReturn(kms).whenever(it).keyManagementService
doReturn(CordappProviderImpl(CordappLoader.createWithTestPackages(listOf("net.corda.testing.contracts")), MockAttachmentStorage())).whenever(it).cordappProvider
doReturn(NodeVaultService(testClock, kms, validatedTransactions, database.hibernateConfig)).whenever(it).vaultService
doReturn(this@NodeSchedulerServiceTest).whenever(it).testReference
}
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
mockSMM = StateMachineManagerImpl(services, DBCheckpointStorage(), smmExecutor, database, newSecureRandom())
scheduler = NodeSchedulerService(testClock, database, FlowStarterImpl(smmExecutor, mockSMM, flowLogicRefFactory), validatedTransactions, schedulerGatedExecutor, serverThread = smmExecutor, flowLogicRefFactory = flowLogicRefFactory)
mockSMM.changes.subscribe { change ->
if (change is StateMachineManager.Change.Removed && mockSMM.allStateMachines.isEmpty()) {
smmHasRemovedAllFlows.countDown()
}
}
mockSMM.start(emptyList())
scheduler.start()
val tearDown = object : TestWatcher() {
override fun succeeded(description: Description) {
scheduler.join()
verifyNoMoreInteractions(flowStarter)
}
}
private var allowedUnsuspendedFiberCount = 0
@After
fun tearDown() {
// We need to make sure the StateMachineManager is done before shutting down executors.
if (mockSMM.allStateMachines.isNotEmpty()) {
smmHasRemovedAllFlows.await()
}
smmExecutor.shutdown()
smmExecutor.awaitTermination(60, TimeUnit.SECONDS)
database.close()
mockSMM.stop(allowedUnsuspendedFiberCount)
private class Event(time: Instant) {
val stateRef = rigorousMock<StateRef>()
val flowLogic = rigorousMock<FlowLogic<*>>()
val ssr = ScheduledStateRef(stateRef, time)
}
// Ignore IntelliJ when it says these properties can be private, if they are we cannot serialise them
// in AMQP.
@Suppress("MemberVisibilityCanPrivate")
class TestState(val flowLogicRef: FlowLogicRef, val instant: Instant, val myIdentity: Party) : LinearState, SchedulableState {
override val participants: List<AbstractParty>
get() = listOf(myIdentity)
override val linearId = UniqueIdentifier()
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
return ScheduledActivity(flowLogicRef, instant)
private fun schedule(time: Instant) = Event(time).apply {
val logicRef = rigorousMock<FlowLogicRef>()
transactionStates[stateRef] = rigorousMock<TransactionState<SchedulableState>>().also {
doReturn(rigorousMock<SchedulableState>().also {
doReturn(ScheduledActivity(logicRef, time)).whenever(it).nextScheduledActivity(same(stateRef)!!, any())
}).whenever(it).data
}
flows[logicRef] = flowLogic
scheduler.scheduleStateActivity(ssr)
}
class TestFlowLogic(private val increment: Int = 1) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
(serviceHub as TestReference).testReference.calls += increment
(serviceHub as TestReference).testReference.countDown.countDown()
}
private fun assertWaitingFor(event: Event, total: Int = 1) {
// The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace:
verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, event.ssr)
}
class Command : TypeOnlyCommandData()
private fun assertStarted(event: Event) {
// Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow:
verify(flowStarter, timeout(5000)).startFlow(same(event.flowLogic)!!, any())
}
@Test
fun `test activity due now`() {
val time = stoppedClock.instant()
scheduleTX(time)
assertThat(calls).isEqualTo(0)
schedulerGatedExecutor.waitAndRun()
countDown.await()
assertThat(calls).isEqualTo(1)
assertStarted(schedule(mark))
}
@Test
fun `test activity due in the past`() {
val time = stoppedClock.instant() - 1.days
scheduleTX(time)
assertThat(calls).isEqualTo(0)
schedulerGatedExecutor.waitAndRun()
countDown.await()
assertThat(calls).isEqualTo(1)
assertStarted(schedule(mark - 1.days))
}
@Test
fun `test activity due in the future`() {
val time = stoppedClock.instant() + 1.days
scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
val event = schedule(mark + 1.days)
assertWaitingFor(event)
testClock.advanceBy(1.days)
backgroundExecutor.shutdown()
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
countDown.await()
assertThat(calls).isEqualTo(1)
assertStarted(event)
}
@Test
fun `test activity due in the future and schedule another earlier`() {
val time = stoppedClock.instant() + 1.days
scheduleTX(time + 1.days)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time, 3)
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
val event2 = schedule(mark + 2.days)
val event1 = schedule(mark + 1.days)
assertWaitingFor(event1)
testClock.advanceBy(1.days)
countDown.await()
assertThat(calls).isEqualTo(3)
backgroundExecutor.shutdown()
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
assertStarted(event1)
assertWaitingFor(event2, 2)
testClock.advanceBy(1.days)
assertStarted(event2)
}
@Test
fun `test activity due in the future and schedule another later`() {
allowedUnsuspendedFiberCount = 1
val time = stoppedClock.instant() + 1.days
scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time + 1.days, 3)
val event1 = schedule(mark + 1.days)
val event2 = schedule(mark + 2.days)
assertWaitingFor(event1)
testClock.advanceBy(1.days)
countDown.await()
assertThat(calls).isEqualTo(1)
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertStarted(event1)
assertWaitingFor(event2)
testClock.advanceBy(1.days)
backgroundExecutor.shutdown()
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
assertStarted(event2)
}
@Test
fun `test activity due in the future and schedule another for same time`() {
val time = stoppedClock.instant() + 1.days
scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time, 3)
val eventA = schedule(mark + 1.days)
val eventB = schedule(mark + 1.days)
assertWaitingFor(eventA)
testClock.advanceBy(1.days)
countDown.await()
assertThat(calls).isEqualTo(1)
backgroundExecutor.shutdown()
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
assertStarted(eventA)
assertStarted(eventB)
}
@Test
fun `test activity due in the future and schedule another for same time then unschedule second`() {
val eventA = schedule(mark + 1.days)
val eventB = schedule(mark + 1.days)
scheduler.unscheduleStateActivity(eventB.stateRef)
assertWaitingFor(eventA)
testClock.advanceBy(1.days)
assertStarted(eventA)
}
@Test
fun `test activity due in the future and schedule another for same time then unschedule original`() {
val time = stoppedClock.instant() + 1.days
val scheduledRef1 = scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time, 3)
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
database.transaction {
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
}
val eventA = schedule(mark + 1.days)
val eventB = schedule(mark + 1.days)
scheduler.unscheduleStateActivity(eventA.stateRef)
assertWaitingFor(eventA) // XXX: Shouldn't it be waiting for eventB now?
testClock.advanceBy(1.days)
countDown.await()
assertThat(calls).isEqualTo(3)
backgroundExecutor.shutdown()
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
assertStarted(eventB)
}
@Test
fun `test activity due in the future then unschedule`() {
val scheduledRef1 = scheduleTX(stoppedClock.instant() + 1.days)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
database.transaction {
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
}
scheduler.unscheduleStateActivity(schedule(mark + 1.days).stateRef)
testClock.advanceBy(1.days)
assertThat(calls).isEqualTo(0)
backgroundExecutor.shutdown()
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
}
private fun scheduleTX(instant: Instant, increment: Int = 1): ScheduledStateRef? {
var scheduledRef: ScheduledStateRef? = null
database.transaction {
apply {
val freshKey = kms.freshKey()
val state = TestState(flowLogicRefFactory.createForRPC(TestFlowLogic::class.java, increment), instant, DUMMY_IDENTITY_1.party)
val builder = TransactionBuilder(null).apply {
addOutputState(state, DummyContract.PROGRAM_ID, DUMMY_NOTARY)
addCommand(Command(), freshKey)
}
val usefulTX = services.signInitialTransaction(builder, freshKey)
val txHash = usefulTX.id
services.recordTransactions(usefulTX)
scheduledRef = ScheduledStateRef(StateRef(txHash, 0), state.instant)
scheduler.scheduleStateActivity(scheduledRef!!)
}
}
return scheduledRef
}
}

View File

@ -1,5 +1,6 @@
package net.corda.testing.internal
import com.nhaarman.mockito_kotlin.doAnswer
import net.corda.core.crypto.Crypto
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.loggerFor
@ -108,3 +109,6 @@ fun createDevNodeCaCertPath(
val nodeCa = createDevNodeCa(intermediateCa, legalName)
return Triple(rootCa, intermediateCa, nodeCa)
}
/** Application of [doAnswer] that gets a value from the given [map] using the arg at [argIndex] as key. */
fun doLookup(map: Map<*, *>, argIndex: Int = 0) = doAnswer { map[it.arguments[argIndex]] }