mirror of
https://github.com/corda/corda.git
synced 2025-06-22 17:09:00 +00:00
Resuming from checkpoints now working - seller (but not buyer) in trader demo can be restarted
This commit is contained in:
@ -30,6 +30,8 @@ val Double.bd: BigDecimal get() = BigDecimal(this)
|
|||||||
val String.bd: BigDecimal get() = BigDecimal(this)
|
val String.bd: BigDecimal get() = BigDecimal(this)
|
||||||
val Long.bd: BigDecimal get() = BigDecimal(this)
|
val Long.bd: BigDecimal get() = BigDecimal(this)
|
||||||
|
|
||||||
|
fun String.abbreviate(maxWidth: Int): String = if (length <= maxWidth) this else take(maxWidth - 1) + "…"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a random positive long generated using a secure RNG. This function sacrifies a bit of entropy in order to
|
* Returns a random positive long generated using a secure RNG. This function sacrifies a bit of entropy in order to
|
||||||
* avoid potential bugs where the value is used in a context where negative numbers are not expected.
|
* avoid potential bugs where the value is used in a context where negative numbers are not expected.
|
||||||
|
@ -4,6 +4,7 @@ import com.google.common.util.concurrent.ListenableFuture
|
|||||||
import com.r3corda.core.serialization.serialize
|
import com.r3corda.core.serialization.serialize
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.concurrent.Executor
|
import java.util.concurrent.Executor
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -68,14 +69,17 @@ interface MessagingService {
|
|||||||
* take the registration object, unlike the callback to [MessagingService.addMessageHandler].
|
* take the registration object, unlike the callback to [MessagingService.addMessageHandler].
|
||||||
*/
|
*/
|
||||||
fun MessagingService.runOnNextMessage(topic: String = "", executor: Executor? = null, callback: (Message) -> Unit) {
|
fun MessagingService.runOnNextMessage(topic: String = "", executor: Executor? = null, callback: (Message) -> Unit) {
|
||||||
|
val consumed = AtomicBoolean()
|
||||||
addMessageHandler(topic, executor) { msg, reg ->
|
addMessageHandler(topic, executor) { msg, reg ->
|
||||||
removeMessageHandler(reg)
|
removeMessageHandler(reg)
|
||||||
|
check(!consumed.getAndSet(true)) { "Called more than once" }
|
||||||
|
check(msg.topic == topic) { "Topic mismatch: ${msg.topic} vs $topic" }
|
||||||
callback(msg)
|
callback(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun MessagingService.send(topic: String, obj: Any, to: MessageRecipients) {
|
fun MessagingService.send(topic: String, payload: Any, to: MessageRecipients) {
|
||||||
send(createMessage(topic, obj.serialize().bits), to)
|
send(createMessage(topic, payload.serialize().bits), to)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3,12 +3,14 @@ package com.r3corda.core.serialization
|
|||||||
import co.paralleluniverse.fibers.Fiber
|
import co.paralleluniverse.fibers.Fiber
|
||||||
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
||||||
import com.esotericsoftware.kryo.Kryo
|
import com.esotericsoftware.kryo.Kryo
|
||||||
|
import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
|
||||||
import com.esotericsoftware.kryo.KryoException
|
import com.esotericsoftware.kryo.KryoException
|
||||||
import com.esotericsoftware.kryo.Serializer
|
import com.esotericsoftware.kryo.Serializer
|
||||||
import com.esotericsoftware.kryo.io.Input
|
import com.esotericsoftware.kryo.io.Input
|
||||||
import com.esotericsoftware.kryo.io.Output
|
import com.esotericsoftware.kryo.io.Output
|
||||||
import com.esotericsoftware.kryo.serializers.JavaSerializer
|
import com.esotericsoftware.kryo.serializers.JavaSerializer
|
||||||
import com.r3corda.core.contracts.*
|
import com.r3corda.core.contracts.*
|
||||||
|
import com.r3corda.core.crypto.Party
|
||||||
import com.r3corda.core.crypto.SecureHash
|
import com.r3corda.core.crypto.SecureHash
|
||||||
import com.r3corda.core.crypto.generateKeyPair
|
import com.r3corda.core.crypto.generateKeyPair
|
||||||
import com.r3corda.core.crypto.sha256
|
import com.r3corda.core.crypto.sha256
|
||||||
@ -17,6 +19,8 @@ import com.r3corda.core.node.services.AttachmentStorage
|
|||||||
import de.javakaffee.kryoserializers.ArraysAsListSerializer
|
import de.javakaffee.kryoserializers.ArraysAsListSerializer
|
||||||
import org.objenesis.strategy.StdInstantiatorStrategy
|
import org.objenesis.strategy.StdInstantiatorStrategy
|
||||||
import java.io.ByteArrayOutputStream
|
import java.io.ByteArrayOutputStream
|
||||||
|
import java.io.ObjectInputStream
|
||||||
|
import java.io.ObjectOutputStream
|
||||||
import java.lang.reflect.InvocationTargetException
|
import java.lang.reflect.InvocationTargetException
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
@ -252,7 +256,7 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
|
|||||||
isRegistrationRequired = false
|
isRegistrationRequired = false
|
||||||
// Allow construction of objects using a JVM backdoor that skips invoking the constructors, if there is no
|
// Allow construction of objects using a JVM backdoor that skips invoking the constructors, if there is no
|
||||||
// no-arg constructor available.
|
// no-arg constructor available.
|
||||||
instantiatorStrategy = Kryo.DefaultInstantiatorStrategy(StdInstantiatorStrategy())
|
instantiatorStrategy = DefaultInstantiatorStrategy(StdInstantiatorStrategy())
|
||||||
|
|
||||||
register(Arrays.asList("").javaClass, ArraysAsListSerializer());
|
register(Arrays.asList("").javaClass, ArraysAsListSerializer());
|
||||||
|
|
||||||
@ -262,18 +266,16 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
|
|||||||
register(Kryo::class.java, object : Serializer<Kryo>() {
|
register(Kryo::class.java, object : Serializer<Kryo>() {
|
||||||
override fun write(kryo: Kryo, output: Output, obj: Kryo) {
|
override fun write(kryo: Kryo, output: Output, obj: Kryo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun read(kryo: Kryo, input: Input, type: Class<Kryo>): Kryo {
|
override fun read(kryo: Kryo, input: Input, type: Class<Kryo>): Kryo {
|
||||||
return createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo)
|
return createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// Some things where the JRE provides an efficient custom serialisation.
|
// Some things where the JRE provides an efficient custom serialisation.
|
||||||
val ser = JavaSerializer()
|
|
||||||
val keyPair = generateKeyPair()
|
val keyPair = generateKeyPair()
|
||||||
register(keyPair.public.javaClass, ser)
|
register(keyPair.public.javaClass, ReferencesAwareJavaSerializer)
|
||||||
register(keyPair.private.javaClass, ser)
|
register(keyPair.private.javaClass, ReferencesAwareJavaSerializer)
|
||||||
register(Instant::class.java, ser)
|
register(Instant::class.java, ReferencesAwareJavaSerializer)
|
||||||
|
|
||||||
// Some classes have to be handled with the ImmutableClassSerializer because they need to have their
|
// Some classes have to be handled with the ImmutableClassSerializer because they need to have their
|
||||||
// constructors be invoked (typically for lazy members).
|
// constructors be invoked (typically for lazy members).
|
||||||
@ -286,6 +288,69 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
|
|||||||
register(SerializedBytes::class.java, SerializedBytesSerializer)
|
register(SerializedBytes::class.java, SerializedBytesSerializer)
|
||||||
|
|
||||||
addDefaultSerializer(SerializeAsToken::class.java, SerializeAsTokenSerializer<SerializeAsToken>())
|
addDefaultSerializer(SerializeAsToken::class.java, SerializeAsTokenSerializer<SerializeAsToken>())
|
||||||
|
|
||||||
|
// This is required to make all the unit tests pass
|
||||||
|
register(Party::class.java)
|
||||||
|
|
||||||
|
noReferencesWithin<WireTransaction>()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use this method to mark any types which can have the same instance within it more than once. This will make sure
|
||||||
|
* the serialised form is stable across multiple serialise-deserialise cycles. Using this on a type with internal cyclic
|
||||||
|
* references will throw a stack overflow exception during serialisation.
|
||||||
|
*/
|
||||||
|
inline fun <reified T : Any> Kryo.noReferencesWithin() {
|
||||||
|
register(T::class.java, NoReferencesSerializer(getSerializer(T::class.java)))
|
||||||
|
}
|
||||||
|
|
||||||
|
class NoReferencesSerializer<T>(val baseSerializer: Serializer<T>) : Serializer<T>() {
|
||||||
|
|
||||||
|
override fun read(kryo: Kryo, input: Input, type: Class<T>): T {
|
||||||
|
val previousValue = kryo.setReferences(false)
|
||||||
|
try {
|
||||||
|
return baseSerializer.read(kryo, input, type)
|
||||||
|
} finally {
|
||||||
|
kryo.references = previousValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun write(kryo: Kryo, output: Output, obj: T) {
|
||||||
|
val previousValue = kryo.setReferences(false)
|
||||||
|
try {
|
||||||
|
baseSerializer.write(kryo, output, obj)
|
||||||
|
} finally {
|
||||||
|
kryo.references = previousValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Improvement to the builtin JavaSerializer by honouring the [Kryo.getReferences] setting.
|
||||||
|
*/
|
||||||
|
object ReferencesAwareJavaSerializer : JavaSerializer() {
|
||||||
|
|
||||||
|
override fun write(kryo: Kryo, output: Output, obj: Any) {
|
||||||
|
if (kryo.references) {
|
||||||
|
super.write(kryo, output, obj)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ObjectOutputStream(output).use {
|
||||||
|
it.writeObject(obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun read(kryo: Kryo, input: Input, type: Class<Any>): Any {
|
||||||
|
return if (kryo.references) {
|
||||||
|
super.read(kryo, input, type)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ObjectInputStream(input).use {
|
||||||
|
it.readObject()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,25 +119,7 @@ class ProgressTracker(vararg steps: Step) {
|
|||||||
* Writable map that lets you insert child [ProgressTracker]s for particular steps. It's OK to edit this even
|
* Writable map that lets you insert child [ProgressTracker]s for particular steps. It's OK to edit this even
|
||||||
* after a progress tracker has been started.
|
* after a progress tracker has been started.
|
||||||
*/
|
*/
|
||||||
var childrenFor = object : HashMap<Step, ProgressTracker>() {
|
val childrenFor: ChildrenProgressTrackers = ChildrenProgressTrackersImpl()
|
||||||
override fun put(key: Step, value: ProgressTracker): ProgressTracker? {
|
|
||||||
val r = super.put(key, value)
|
|
||||||
childSubscriptions[value] = value.changes.subscribe({ _changes.onNext(it) }, { _changes.onError(it) })
|
|
||||||
value.parent = this@ProgressTracker
|
|
||||||
_changes.onNext(Change.Structural(this@ProgressTracker, key))
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun remove(key: Step): ProgressTracker? {
|
|
||||||
val tracker = this[key]
|
|
||||||
if (tracker != null) {
|
|
||||||
tracker.parent = null
|
|
||||||
childSubscriptions[tracker]?.let { it.unsubscribe(); childSubscriptions.remove(tracker) }
|
|
||||||
}
|
|
||||||
_changes.onNext(Change.Structural(this@ProgressTracker, key))
|
|
||||||
return super.remove(key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** The parent of this tracker: set automatically by the parent when a tracker is added as a child */
|
/** The parent of this tracker: set automatically by the parent when a tracker is added as a child */
|
||||||
var parent: ProgressTracker? = null
|
var parent: ProgressTracker? = null
|
||||||
@ -150,8 +132,6 @@ class ProgressTracker(vararg steps: Step) {
|
|||||||
return cursor
|
return cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
private val childSubscriptions = HashMap<ProgressTracker, Subscription>()
|
|
||||||
|
|
||||||
private fun _allSteps(level: Int = 0): List<Pair<Int, Step>> {
|
private fun _allSteps(level: Int = 0): List<Pair<Int, Step>> {
|
||||||
val result = ArrayList<Pair<Int, Step>>()
|
val result = ArrayList<Pair<Int, Step>>()
|
||||||
for (step in steps) {
|
for (step in steps) {
|
||||||
@ -188,4 +168,37 @@ class ProgressTracker(vararg steps: Step) {
|
|||||||
* if a step changed its label or rendering).
|
* if a step changed its label or rendering).
|
||||||
*/
|
*/
|
||||||
val changes: Observable<Change> get() = _changes
|
val changes: Observable<Change> get() = _changes
|
||||||
|
|
||||||
|
|
||||||
|
// TODO remove this interface and add its three methods directly into ProgressTracker
|
||||||
|
interface ChildrenProgressTrackers {
|
||||||
|
operator fun get(step: ProgressTracker.Step): ProgressTracker?
|
||||||
|
operator fun set(step: ProgressTracker.Step, childProgressTracker: ProgressTracker)
|
||||||
|
fun remove(step: ProgressTracker.Step)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private inner class ChildrenProgressTrackersImpl : ChildrenProgressTrackers {
|
||||||
|
|
||||||
|
private val map = HashMap<Step, Pair<ProgressTracker, Subscription>>()
|
||||||
|
|
||||||
|
override fun get(step: Step): ProgressTracker? = map[step]?.first
|
||||||
|
|
||||||
|
override fun set(step: Step, childProgressTracker: ProgressTracker) {
|
||||||
|
val subscription = childProgressTracker.changes.subscribe({ _changes.onNext(it) }, { _changes.onError(it) })
|
||||||
|
map[step] = Pair(childProgressTracker, subscription)
|
||||||
|
childProgressTracker.parent = this@ProgressTracker
|
||||||
|
_changes.onNext(Change.Structural(this@ProgressTracker, step))
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun remove(step: Step) {
|
||||||
|
map.remove(step)?.let {
|
||||||
|
it.first.parent = null
|
||||||
|
it.second.unsubscribe()
|
||||||
|
}
|
||||||
|
_changes.onNext(Change.Structural(this@ProgressTracker, step))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,34 +1,68 @@
|
|||||||
package com.r3corda.core.serialization
|
package com.r3corda.core.serialization
|
||||||
|
|
||||||
import com.esotericsoftware.kryo.Kryo
|
import com.google.common.primitives.Ints
|
||||||
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import kotlin.test.assertEquals
|
import java.util.*
|
||||||
import kotlin.test.assertNull
|
|
||||||
|
|
||||||
class KryoTests {
|
class KryoTests {
|
||||||
data class Person(val name: String, val birthday: Instant?)
|
|
||||||
|
|
||||||
private val kryo: Kryo = createKryo()
|
private val kryo = createKryo()
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun ok() {
|
fun ok() {
|
||||||
val april_17th = Instant.parse("1984-04-17T00:30:00.00Z")
|
val birthday = Instant.parse("1984-04-17T00:30:00.00Z")
|
||||||
val mike = Person("mike", april_17th)
|
val mike = Person("mike", birthday)
|
||||||
val bits = mike.serialize(kryo)
|
val bits = mike.serialize(kryo)
|
||||||
with(bits.deserialize<Person>(kryo)) {
|
assertThat(bits.deserialize(kryo)).isEqualTo(Person("mike", birthday))
|
||||||
assertEquals("mike", name)
|
|
||||||
assertEquals(april_17th, birthday)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun nullables() {
|
fun nullables() {
|
||||||
val bob = Person("bob", null)
|
val bob = Person("bob", null)
|
||||||
val bits = bob.serialize(kryo)
|
val bits = bob.serialize(kryo)
|
||||||
with(bits.deserialize<Person>(kryo)) {
|
assertThat(bits.deserialize(kryo)).isEqualTo(Person("bob", null))
|
||||||
assertEquals("bob", name)
|
|
||||||
assertNull(birthday)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `serialised form is stable when the same object instance is added to the deserialised object graph`() {
|
||||||
|
kryo.noReferencesWithin<ArrayList<*>>()
|
||||||
|
val obj = Ints.toByteArray(0x01234567).opaque()
|
||||||
|
val originalList = arrayListOf(obj)
|
||||||
|
val deserialisedList = originalList.serialize(kryo).deserialize(kryo)
|
||||||
|
originalList += obj
|
||||||
|
deserialisedList += obj
|
||||||
|
assertThat(deserialisedList.serialize(kryo)).isEqualTo(originalList.serialize(kryo))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `serialised form is stable when the same object instance occurs more than once, and using java serialisation`() {
|
||||||
|
kryo.noReferencesWithin<ArrayList<*>>()
|
||||||
|
val instant = Instant.ofEpochMilli(123)
|
||||||
|
val instantCopy = Instant.ofEpochMilli(123)
|
||||||
|
assertThat(instant).isNotSameAs(instantCopy)
|
||||||
|
val listWithCopies = arrayListOf(instant, instantCopy)
|
||||||
|
val listWithSameInstances = arrayListOf(instant, instant)
|
||||||
|
assertThat(listWithSameInstances.serialize(kryo)).isEqualTo(listWithCopies.serialize(kryo))
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `cyclic object graph`() {
|
||||||
|
val cyclic = Cyclic(3)
|
||||||
|
val bits = cyclic.serialize(kryo)
|
||||||
|
assertThat(bits.deserialize(kryo)).isEqualTo(cyclic)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private data class Person(val name: String, val birthday: Instant?)
|
||||||
|
|
||||||
|
@Suppress("unused")
|
||||||
|
private class Cyclic(val value: Int) {
|
||||||
|
val thisInstance = this
|
||||||
|
override fun equals(other: Any?): Boolean = (this === other) || (other is Cyclic && this.value == other.value)
|
||||||
|
override fun hashCode(): Int = value.hashCode()
|
||||||
|
override fun toString(): String = "Cyclic($value)"
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -20,9 +20,6 @@ import com.r3corda.node.services.api.AcceptsFileUpload
|
|||||||
import com.r3corda.node.services.api.CheckpointStorage
|
import com.r3corda.node.services.api.CheckpointStorage
|
||||||
import com.r3corda.node.services.api.MonitoringService
|
import com.r3corda.node.services.api.MonitoringService
|
||||||
import com.r3corda.node.services.api.ServiceHubInternal
|
import com.r3corda.node.services.api.ServiceHubInternal
|
||||||
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
|
|
||||||
import com.r3corda.node.services.transactions.NotaryService
|
|
||||||
import com.r3corda.node.services.transactions.TimestampChecker
|
|
||||||
import com.r3corda.node.services.clientapi.NodeInterestRates
|
import com.r3corda.node.services.clientapi.NodeInterestRates
|
||||||
import com.r3corda.node.services.config.NodeConfiguration
|
import com.r3corda.node.services.config.NodeConfiguration
|
||||||
import com.r3corda.node.services.identity.InMemoryIdentityService
|
import com.r3corda.node.services.identity.InMemoryIdentityService
|
||||||
@ -36,6 +33,9 @@ import com.r3corda.node.services.persistence.NodeAttachmentService
|
|||||||
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
|
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
|
||||||
import com.r3corda.node.services.persistence.StorageServiceImpl
|
import com.r3corda.node.services.persistence.StorageServiceImpl
|
||||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||||
|
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
|
||||||
|
import com.r3corda.node.services.transactions.NotaryService
|
||||||
|
import com.r3corda.node.services.transactions.TimestampChecker
|
||||||
import com.r3corda.node.services.wallet.NodeWalletService
|
import com.r3corda.node.services.wallet.NodeWalletService
|
||||||
import com.r3corda.node.utilities.AddOrRemove
|
import com.r3corda.node.utilities.AddOrRemove
|
||||||
import com.r3corda.node.utilities.AffinityExecutor
|
import com.r3corda.node.utilities.AffinityExecutor
|
||||||
@ -106,6 +106,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
|||||||
lateinit var identity: IdentityService
|
lateinit var identity: IdentityService
|
||||||
lateinit var net: MessagingService
|
lateinit var net: MessagingService
|
||||||
lateinit var api: APIServer
|
lateinit var api: APIServer
|
||||||
|
var isPreviousCheckpointsPresent = false
|
||||||
|
private set
|
||||||
|
|
||||||
/** Completes once the node has successfully registered with the network map service. Null until [start] returns. */
|
/** Completes once the node has successfully registered with the network map service. Null until [start] returns. */
|
||||||
@Volatile var networkMapRegistrationFuture: ListenableFuture<Unit>? = null
|
@Volatile var networkMapRegistrationFuture: ListenableFuture<Unit>? = null
|
||||||
@ -138,9 +140,10 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
|||||||
// This object doesn't need to be referenced from this class because it registers handlers on the network
|
// This object doesn't need to be referenced from this class because it registers handlers on the network
|
||||||
// service and so that keeps it from being collected.
|
// service and so that keeps it from being collected.
|
||||||
DataVendingService(net, storage)
|
DataVendingService(net, storage)
|
||||||
|
|
||||||
startMessagingService()
|
startMessagingService()
|
||||||
networkMapRegistrationFuture = registerWithNetworkMap()
|
networkMapRegistrationFuture = registerWithNetworkMap()
|
||||||
|
isPreviousCheckpointsPresent = checkpointStorage.checkpoints.any()
|
||||||
|
smm.start()
|
||||||
started = true
|
started = true
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,7 @@
|
|||||||
package com.r3corda.node.services.api
|
package com.r3corda.node.services.api
|
||||||
|
|
||||||
import com.r3corda.core.crypto.sha256
|
|
||||||
import com.r3corda.core.protocols.ProtocolStateMachine
|
|
||||||
import com.r3corda.core.serialization.SerializedBytes
|
import com.r3corda.core.serialization.SerializedBytes
|
||||||
|
import com.r3corda.node.services.statemachine.ProtocolStateMachineImpl
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread-safe storage of fiber checkpoints.
|
* Thread-safe storage of fiber checkpoints.
|
||||||
@ -33,11 +32,8 @@ interface CheckpointStorage {
|
|||||||
|
|
||||||
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
|
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
|
||||||
data class Checkpoint(
|
data class Checkpoint(
|
||||||
val serialisedFiber: SerializedBytes<out ProtocolStateMachine<*>>,
|
val serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
|
||||||
val awaitingTopic: String,
|
val awaitingTopic: String?,
|
||||||
val awaitingObjectOfType: String // java class name
|
val awaitingPayloadType: String?,
|
||||||
) {
|
val receivedPayload: Any?
|
||||||
override fun toString(): String {
|
)
|
||||||
return "Checkpoint(#serialisedFiber=${serialisedFiber.sha256()}, awaitingTopic=$awaitingTopic, awaitingObjectOfType=$awaitingObjectOfType)"
|
|
||||||
}
|
|
||||||
}
|
|
@ -125,9 +125,8 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
|
|||||||
val secManager = ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, secConfig)
|
val secManager = ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, secConfig)
|
||||||
mq.setSecurityManager(secManager)
|
mq.setSecurityManager(secManager)
|
||||||
|
|
||||||
// Currently we cannot find out if something goes wrong during startup :( This is bug ARTEMIS-388 filed by me.
|
// TODO Currently we cannot find out if something goes wrong during startup :( This is bug ARTEMIS-388 filed by me.
|
||||||
// The fix should be in the 1.3.0 release:
|
// The fix should be in the 1.3.0 release:
|
||||||
//
|
|
||||||
// https://issues.apache.org/jira/browse/ARTEMIS-388
|
// https://issues.apache.org/jira/browse/ARTEMIS-388
|
||||||
mq.start()
|
mq.start()
|
||||||
|
|
||||||
@ -137,12 +136,13 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
|
|||||||
|
|
||||||
// Create a queue on which to receive messages and set up the handler.
|
// Create a queue on which to receive messages and set up the handler.
|
||||||
session = clientFactory.createSession()
|
session = clientFactory.createSession()
|
||||||
|
|
||||||
session.createQueue(myHostPort.toString(), "inbound", false)
|
session.createQueue(myHostPort.toString(), "inbound", false)
|
||||||
inboundConsumer = session.createConsumer("inbound").setMessageHandler { message: ClientMessage ->
|
inboundConsumer = session.createConsumer("inbound").setMessageHandler { message: ClientMessage ->
|
||||||
// This code runs for every inbound message.
|
// This code runs for every inbound message.
|
||||||
try {
|
try {
|
||||||
if (!message.containsProperty(TOPIC_PROPERTY)) {
|
if (!message.containsProperty(TOPIC_PROPERTY)) {
|
||||||
log.warn("Received message without a ${TOPIC_PROPERTY} property, ignoring")
|
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
|
||||||
return@setMessageHandler
|
return@setMessageHandler
|
||||||
}
|
}
|
||||||
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
||||||
@ -160,6 +160,8 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
|
|||||||
|
|
||||||
deliverMessage(msg)
|
deliverMessage(msg)
|
||||||
} finally {
|
} finally {
|
||||||
|
// TODO the message is delivered onto an executor and so we may be acking the message before we've
|
||||||
|
// finished processing it
|
||||||
message.acknowledge()
|
message.acknowledge()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,11 +21,11 @@ import org.slf4j.LoggerFactory
|
|||||||
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
|
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
|
||||||
* logic element gets to return the value that the entire state machine resolves to.
|
* logic element gets to return the value that the entire state machine resolves to.
|
||||||
*/
|
*/
|
||||||
class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberScheduler, val loggerName: String) : Fiber<R>("protocol", scheduler), ProtocolStateMachine<R> {
|
class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberScheduler, private val loggerName: String) : Fiber<R>("protocol", scheduler), ProtocolStateMachine<R> {
|
||||||
|
|
||||||
// These fields shouldn't be serialised, so they are marked @Transient.
|
// These fields shouldn't be serialised, so they are marked @Transient.
|
||||||
@Transient private var suspendAction: ((result: StateMachineManager.FiberRequest, fiber: ProtocolStateMachineImpl<*>) -> Unit)? = null
|
@Transient private var suspendAction: ((result: StateMachineManager.FiberRequest, fiber: ProtocolStateMachineImpl<*>) -> Unit)? = null
|
||||||
@Transient private var resumeWithObject: Any? = null
|
@Transient private var receivedPayload: Any? = null
|
||||||
@Transient lateinit override var serviceHub: ServiceHubInternal
|
@Transient lateinit override var serviceHub: ServiceHubInternal
|
||||||
|
|
||||||
@Transient private var _logger: Logger? = null
|
@Transient private var _logger: Logger? = null
|
||||||
@ -52,11 +52,11 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberS
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun prepareForResumeWith(serviceHub: ServiceHubInternal,
|
fun prepareForResumeWith(serviceHub: ServiceHubInternal,
|
||||||
withObject: Any?,
|
receivedPayload: Any?,
|
||||||
suspendAction: (StateMachineManager.FiberRequest, ProtocolStateMachineImpl<*>) -> Unit) {
|
suspendAction: (StateMachineManager.FiberRequest, ProtocolStateMachineImpl<*>) -> Unit) {
|
||||||
this.suspendAction = suspendAction
|
|
||||||
this.resumeWithObject = withObject
|
|
||||||
this.serviceHub = serviceHub
|
this.serviceHub = serviceHub
|
||||||
|
this.receivedPayload = receivedPayload
|
||||||
|
this.suspendAction = suspendAction
|
||||||
}
|
}
|
||||||
|
|
||||||
@Suspendable @Suppress("UNCHECKED_CAST")
|
@Suspendable @Suppress("UNCHECKED_CAST")
|
||||||
@ -75,9 +75,10 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberS
|
|||||||
@Suspendable @Suppress("UNCHECKED_CAST")
|
@Suspendable @Suppress("UNCHECKED_CAST")
|
||||||
private fun <T : Any> suspendAndExpectReceive(with: StateMachineManager.FiberRequest): UntrustworthyData<T> {
|
private fun <T : Any> suspendAndExpectReceive(with: StateMachineManager.FiberRequest): UntrustworthyData<T> {
|
||||||
suspend(with)
|
suspend(with)
|
||||||
val tmp = resumeWithObject ?: throw IllegalStateException("Expected to receive something")
|
check(receivedPayload != null) { "Expected to receive something" }
|
||||||
resumeWithObject = null
|
val untrustworthy = UntrustworthyData(receivedPayload as T)
|
||||||
return UntrustworthyData(tmp as T)
|
receivedPayload = null
|
||||||
|
return untrustworthy
|
||||||
}
|
}
|
||||||
|
|
||||||
@Suspendable @Suppress("UNCHECKED_CAST")
|
@Suspendable @Suppress("UNCHECKED_CAST")
|
||||||
@ -102,7 +103,13 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberS
|
|||||||
@Suspendable
|
@Suspendable
|
||||||
private fun suspend(with: StateMachineManager.FiberRequest) {
|
private fun suspend(with: StateMachineManager.FiberRequest) {
|
||||||
parkAndSerialize { fiber, serializer ->
|
parkAndSerialize { fiber, serializer ->
|
||||||
|
try {
|
||||||
suspendAction!!(with, this)
|
suspendAction!!(with, this)
|
||||||
|
} catch (t: Throwable) {
|
||||||
|
logger.warn("Captured exception which was swallowed by Quasar", t)
|
||||||
|
// TODO to throw or not to throw, that is the question
|
||||||
|
throw t
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,14 +4,13 @@ import co.paralleluniverse.fibers.Fiber
|
|||||||
import co.paralleluniverse.fibers.FiberExecutorScheduler
|
import co.paralleluniverse.fibers.FiberExecutorScheduler
|
||||||
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
||||||
import com.codahale.metrics.Gauge
|
import com.codahale.metrics.Gauge
|
||||||
import com.esotericsoftware.kryo.io.Input
|
|
||||||
import com.google.common.base.Throwables
|
import com.google.common.base.Throwables
|
||||||
import com.google.common.util.concurrent.ListenableFuture
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
|
import com.r3corda.core.abbreviate
|
||||||
import com.r3corda.core.messaging.MessageRecipients
|
import com.r3corda.core.messaging.MessageRecipients
|
||||||
import com.r3corda.core.messaging.runOnNextMessage
|
import com.r3corda.core.messaging.runOnNextMessage
|
||||||
import com.r3corda.core.messaging.send
|
import com.r3corda.core.messaging.send
|
||||||
import com.r3corda.core.protocols.ProtocolLogic
|
import com.r3corda.core.protocols.ProtocolLogic
|
||||||
import com.r3corda.core.protocols.ProtocolStateMachine
|
|
||||||
import com.r3corda.core.serialization.*
|
import com.r3corda.core.serialization.*
|
||||||
import com.r3corda.core.then
|
import com.r3corda.core.then
|
||||||
import com.r3corda.core.utilities.ProgressTracker
|
import com.r3corda.core.utilities.ProgressTracker
|
||||||
@ -24,7 +23,6 @@ import java.io.PrintWriter
|
|||||||
import java.io.StringWriter
|
import java.io.StringWriter
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.Collections.synchronizedMap
|
import java.util.Collections.synchronizedMap
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -75,12 +73,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
|
|||||||
private val serializationContext = SerializeAsTokenContext(serviceHub)
|
private val serializationContext = SerializeAsTokenContext(serviceHub)
|
||||||
|
|
||||||
/** Returns a list of all state machines executing the given protocol logic at the top level (subprotocols do not count) */
|
/** Returns a list of all state machines executing the given protocol logic at the top level (subprotocols do not count) */
|
||||||
fun <T> findStateMachines(klass: Class<out ProtocolLogic<T>>): List<Pair<ProtocolLogic<T>, ListenableFuture<T>>> {
|
fun <P : ProtocolLogic<T>, T> findStateMachines(protocolClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
|
||||||
synchronized(stateMachines) {
|
synchronized(stateMachines) {
|
||||||
@Suppress("UNCHECKED_CAST")
|
@Suppress("UNCHECKED_CAST")
|
||||||
return stateMachines.keys
|
return stateMachines.keys
|
||||||
.map { it.logic }
|
.map { it.logic }
|
||||||
.filterIsInstance(klass)
|
.filterIsInstance(protocolClass)
|
||||||
.map { it to (it.psm as ProtocolStateMachineImpl<T>).resultFuture }
|
.map { it to (it.psm as ProtocolStateMachineImpl<T>).resultFuture }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -92,59 +90,56 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
|
|||||||
field.get(null)
|
field.get(null)
|
||||||
}
|
}
|
||||||
|
|
||||||
companion object {
|
|
||||||
var restoreCheckpointsOnStart = true
|
|
||||||
}
|
|
||||||
|
|
||||||
init {
|
init {
|
||||||
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
|
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
|
||||||
(fiber as ProtocolStateMachineImpl<*>).logger.error("Caught exception from protocol", throwable)
|
(fiber as ProtocolStateMachineImpl<*>).logger.error("Caught exception from protocol", throwable)
|
||||||
}
|
}
|
||||||
if (restoreCheckpointsOnStart)
|
|
||||||
restoreCheckpoints()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Reads the database map and resurrects any serialised state machines. */
|
fun start() {
|
||||||
private fun restoreCheckpoints() {
|
checkpointStorage.checkpoints.forEach { restoreCheckpoint(it) }
|
||||||
for (checkpoint in checkpointStorage.checkpoints) {
|
}
|
||||||
// Grab the Kryo engine configured by Quasar for its own stuff, and then do our own configuration on top
|
|
||||||
// so we can deserialised the nested stream that holds the fiber.
|
private fun restoreCheckpoint(checkpoint: Checkpoint) {
|
||||||
val psm = deserializeFiber(checkpoint.serialisedFiber)
|
val fiber = deserializeFiber(checkpoint.serialisedFiber)
|
||||||
initFiber(psm, checkpoint)
|
initFiber(fiber, checkpoint)
|
||||||
val awaitingObjectOfType = Class.forName(checkpoint.awaitingObjectOfType)
|
|
||||||
val topic = checkpoint.awaitingTopic
|
val topic = checkpoint.awaitingTopic
|
||||||
|
if (topic != null) {
|
||||||
psm.logger.info("restored ${psm.logic} - was previously awaiting on topic $topic")
|
val awaitingPayloadType = Class.forName(checkpoint.awaitingPayloadType)
|
||||||
|
fiber.logger.info("Restored ${fiber.logic} - it was previously waiting for message of type ${awaitingPayloadType.name} on topic $topic")
|
||||||
// And now re-wire the deserialised continuation back up to the network service.
|
iterateOnResponse(fiber, awaitingPayloadType, checkpoint.serialisedFiber, topic) {
|
||||||
serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg ->
|
|
||||||
// TODO: See security note below.
|
|
||||||
val obj: Any = THREAD_LOCAL_KRYO.get().readClassAndObject(Input(netMsg.data))
|
|
||||||
if (!awaitingObjectOfType.isInstance(obj))
|
|
||||||
throw ClassCastException("Received message of unexpected type: ${obj.javaClass.name} vs ${awaitingObjectOfType.name}")
|
|
||||||
psm.logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
|
|
||||||
iterateStateMachine(psm, obj) {
|
|
||||||
try {
|
try {
|
||||||
Fiber.unparkDeserialized(it, scheduler)
|
Fiber.unparkDeserialized(fiber, scheduler)
|
||||||
} catch (e: Throwable) {
|
} catch (e: Throwable) {
|
||||||
logError(e, obj, topic, it)
|
logError(e, it, topic, fiber)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fiber.logger.info("Restored ${fiber.logic} - it was not waiting on any message; received payload: ${checkpoint.receivedPayload.toString().abbreviate(50)}")
|
||||||
|
executor.executeASAP {
|
||||||
|
iterateStateMachine(fiber, checkpoint.receivedPayload) {
|
||||||
|
try {
|
||||||
|
Fiber.unparkDeserialized(fiber, scheduler)
|
||||||
|
} catch (e: Throwable) {
|
||||||
|
logError(e, it, null, fiber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun deserializeFiber(serialisedFiber: SerializedBytes<out ProtocolStateMachine<*>>): ProtocolStateMachineImpl<*> {
|
private fun deserializeFiber(serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>): ProtocolStateMachineImpl<*> {
|
||||||
val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer
|
val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer
|
||||||
val kryo = createKryo(deserializer.kryo)
|
val kryo = createKryo(deserializer.kryo)
|
||||||
// put the map of token -> tokenized into the kryo context
|
// put the map of token -> tokenized into the kryo context
|
||||||
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
|
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
|
||||||
return serialisedFiber.deserialize(kryo) as ProtocolStateMachineImpl<*>
|
return serialisedFiber.deserialize(kryo)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun logError(e: Throwable, obj: Any, topic: String, psm: ProtocolStateMachineImpl<*>) {
|
private fun logError(e: Throwable, payload: Any?, topic: String?, psm: ProtocolStateMachineImpl<*>) {
|
||||||
psm.logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " +
|
psm.logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " +
|
||||||
"when handling a message of type ${obj.javaClass.name} on topic $topic")
|
"when handling a message of type ${payload?.javaClass?.name} on topic $topic")
|
||||||
if (psm.logger.isTraceEnabled) {
|
if (psm.logger.isTraceEnabled) {
|
||||||
val s = StringWriter()
|
val s = StringWriter()
|
||||||
Throwables.getRootCause(e).printStackTrace(PrintWriter(s))
|
Throwables.getRootCause(e).printStackTrace(PrintWriter(s))
|
||||||
@ -152,11 +147,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun initFiber(psm: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) {
|
private fun initFiber(fiber: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) {
|
||||||
stateMachines[psm] = checkpoint
|
stateMachines[fiber] = checkpoint
|
||||||
psm.resultFuture.then(executor) {
|
fiber.resultFuture.then(executor) {
|
||||||
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
|
fiber.logic.progressTracker?.currentStep = ProgressTracker.DONE
|
||||||
val finalCheckpoint = stateMachines.remove(psm)
|
val finalCheckpoint = stateMachines.remove(fiber)
|
||||||
if (finalCheckpoint != null) {
|
if (finalCheckpoint != null) {
|
||||||
checkpointStorage.removeCheckpoint(finalCheckpoint)
|
checkpointStorage.removeCheckpoint(finalCheckpoint)
|
||||||
}
|
}
|
||||||
@ -176,7 +171,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
|
|||||||
initFiber(fiber, null)
|
initFiber(fiber, null)
|
||||||
executor.executeASAP {
|
executor.executeASAP {
|
||||||
iterateStateMachine(fiber, null) {
|
iterateStateMachine(fiber, null) {
|
||||||
it.start()
|
fiber.start()
|
||||||
}
|
}
|
||||||
totalStartedProtocols.inc()
|
totalStartedProtocols.inc()
|
||||||
}
|
}
|
||||||
@ -187,9 +182,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun replaceCheckpoint(psm: ProtocolStateMachineImpl<*>, newCheckpoint: Checkpoint) {
|
private fun updateCheckpoint(psm: ProtocolStateMachineImpl<*>,
|
||||||
// It's OK for this to be unsynchronised, as the prev/new byte arrays are specific to a continuation instance,
|
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
|
||||||
// and the underlying map provided by the database layer is expected to be thread safe.
|
awaitingTopic: String?,
|
||||||
|
awaitingPayloadType: Class<*>?,
|
||||||
|
receivedPayload: Any?) {
|
||||||
|
val newCheckpoint = Checkpoint(serialisedFiber, awaitingTopic, awaitingPayloadType?.name, receivedPayload)
|
||||||
val previousCheckpoint = stateMachines.put(psm, newCheckpoint)
|
val previousCheckpoint = stateMachines.put(psm, newCheckpoint)
|
||||||
if (previousCheckpoint != null) {
|
if (previousCheckpoint != null) {
|
||||||
checkpointStorage.removeCheckpoint(previousCheckpoint)
|
checkpointStorage.removeCheckpoint(previousCheckpoint)
|
||||||
@ -199,10 +197,20 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun iterateStateMachine(psm: ProtocolStateMachineImpl<*>,
|
private fun iterateStateMachine(psm: ProtocolStateMachineImpl<*>,
|
||||||
obj: Any?,
|
receivedPayload: Any?,
|
||||||
resumeFunc: (ProtocolStateMachineImpl<*>) -> Unit) {
|
resumeAction: (Any?) -> Unit) {
|
||||||
executor.checkOnThread()
|
executor.checkOnThread()
|
||||||
val onSuspend = fun(request: FiberRequest, fiber: ProtocolStateMachineImpl<*>) {
|
psm.prepareForResumeWith(serviceHub, receivedPayload) { request, serialisedFiber ->
|
||||||
|
psm.logger.trace { "Suspended fiber ${psm.id} ${psm.logic}" }
|
||||||
|
onNextSuspend(psm, request, serialisedFiber)
|
||||||
|
}
|
||||||
|
psm.logger.trace { "Waking up fiber ${psm.id} ${psm.logic}" }
|
||||||
|
resumeAction(receivedPayload)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun onNextSuspend(psm: ProtocolStateMachineImpl<*>,
|
||||||
|
request: FiberRequest,
|
||||||
|
fiber: ProtocolStateMachineImpl<*>) {
|
||||||
// We have a request to do something: send, receive, or send-and-receive.
|
// We have a request to do something: send, receive, or send-and-receive.
|
||||||
if (request is FiberRequest.ExpectingResponse<*>) {
|
if (request is FiberRequest.ExpectingResponse<*>) {
|
||||||
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
|
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
|
||||||
@ -211,69 +219,72 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
|
|||||||
// add the map of tokens -> tokenizedServices to the kyro context
|
// add the map of tokens -> tokenizedServices to the kyro context
|
||||||
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
|
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
|
||||||
val serialisedFiber = fiber.serialize(kryo)
|
val serialisedFiber = fiber.serialize(kryo)
|
||||||
// Prepare a listener on the network that runs in the background thread when we received a message.
|
// Prepare a listener on the network that runs in the background thread when we receive a message.
|
||||||
checkpointAndSetupMessageHandler(psm, request, serialisedFiber)
|
checkpointOnExpectingResponse(psm, request, serialisedFiber)
|
||||||
}
|
}
|
||||||
// If an object to send was provided (not null), send it now.
|
// If a non-null payload to send was provided, send it now.
|
||||||
request.obj?.let {
|
request.payload?.let {
|
||||||
val topic = "${request.topic}.${request.sessionIDForSend}"
|
val topic = "${request.topic}.${request.sessionIDForSend}"
|
||||||
psm.logger.trace { "-> ${request.destination}/$topic : message of type ${it.javaClass.name}" }
|
psm.logger.trace { "Sending message of type ${it.javaClass.name} using topic $topic to ${request.destination} (${it.toString().abbreviate(50)})" }
|
||||||
serviceHub.networkService.send(topic, it, request.destination!!)
|
serviceHub.networkService.send(topic, it, request.destination!!)
|
||||||
}
|
}
|
||||||
if (request is FiberRequest.NotExpectingResponse) {
|
if (request is FiberRequest.NotExpectingResponse) {
|
||||||
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
||||||
iterateStateMachine(psm, null) {
|
iterateStateMachine(psm, null) {
|
||||||
try {
|
try {
|
||||||
Fiber.unpark(it, QUASAR_UNBLOCKER)
|
Fiber.unpark(psm, QUASAR_UNBLOCKER)
|
||||||
} catch(e: Throwable) {
|
} catch(e: Throwable) {
|
||||||
logError(e, request.obj!!, request.topic, it)
|
logError(e, request.payload, request.topic, psm)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
psm.prepareForResumeWith(serviceHub, obj, onSuspend)
|
private fun checkpointOnExpectingResponse(psm: ProtocolStateMachineImpl<*>,
|
||||||
|
|
||||||
resumeFunc(psm)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun checkpointAndSetupMessageHandler(psm: ProtocolStateMachineImpl<*>,
|
|
||||||
request: FiberRequest.ExpectingResponse<*>,
|
request: FiberRequest.ExpectingResponse<*>,
|
||||||
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) {
|
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) {
|
||||||
executor.checkOnThread()
|
executor.checkOnThread()
|
||||||
val topic = "${request.topic}.${request.sessionIDForReceive}"
|
val topic = "${request.topic}.${request.sessionIDForReceive}"
|
||||||
val newCheckpoint = Checkpoint(serialisedFiber, topic, request.responseType.name)
|
updateCheckpoint(psm, serialisedFiber, topic, request.responseType, null)
|
||||||
replaceCheckpoint(psm, newCheckpoint)
|
psm.logger.trace { "Preparing to receive message of type ${request.responseType.name} on topic $topic" }
|
||||||
psm.logger.trace { "Waiting for message of type ${request.responseType.name} on $topic" }
|
iterateOnResponse(psm, request.responseType, serialisedFiber, topic) {
|
||||||
val consumed = AtomicBoolean()
|
try {
|
||||||
|
Fiber.unpark(psm, QUASAR_UNBLOCKER)
|
||||||
|
} catch(e: Throwable) {
|
||||||
|
logError(e, it, topic, psm)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun iterateOnResponse(psm: ProtocolStateMachineImpl<*>,
|
||||||
|
responseType: Class<*>,
|
||||||
|
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
|
||||||
|
topic: String,
|
||||||
|
resumeAction: (Any?) -> Unit) {
|
||||||
serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg ->
|
serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg ->
|
||||||
// Some assertions to ensure we don't execute on the wrong thread or get executed more than once.
|
// Assertion to ensure we don't execute on the wrong thread.
|
||||||
executor.checkOnThread()
|
executor.checkOnThread()
|
||||||
check(netMsg.topic == topic) { "Topic mismatch: ${netMsg.topic} vs $topic" }
|
|
||||||
check(!consumed.getAndSet(true))
|
|
||||||
// TODO: This is insecure: we should not deserialise whatever we find and *then* check.
|
// TODO: This is insecure: we should not deserialise whatever we find and *then* check.
|
||||||
//
|
|
||||||
// We should instead verify as we read the data that it's what we are expecting and throw as early as
|
// We should instead verify as we read the data that it's what we are expecting and throw as early as
|
||||||
// possible. We only do it this way for convenience during the prototyping stage. Note that this means
|
// possible. We only do it this way for convenience during the prototyping stage. Note that this means
|
||||||
// we could simply not require the programmer to specify the expected return type at all, and catch it
|
// we could simply not require the programmer to specify the expected return type at all, and catch it
|
||||||
// at the last moment when we do the downcast. However this would make protocol code harder to read and
|
// at the last moment when we do the downcast. However this would make protocol code harder to read and
|
||||||
// make it more difficult to migrate to a more explicit serialisation scheme later.
|
// make it more difficult to migrate to a more explicit serialisation scheme later.
|
||||||
val obj: Any = THREAD_LOCAL_KRYO.get().readClassAndObject(Input(netMsg.data))
|
val payload = netMsg.data.deserialize<Any>()
|
||||||
if (!request.responseType.isInstance(obj))
|
check(responseType.isInstance(payload)) { "Expected message of type ${responseType.name} but got ${payload.javaClass.name}" }
|
||||||
throw IllegalStateException("Expected message of type ${request.responseType.name} but got ${obj.javaClass.name}", request.stackTraceInCaseOfProblems)
|
// Update the fiber's checkpoint so that it's no longer waiting on a response, but rather has the received payload
|
||||||
iterateStateMachine(psm, obj) {
|
updateCheckpoint(psm, serialisedFiber, null, null, payload)
|
||||||
try {
|
psm.logger.trace { "Received message of type ${payload.javaClass.name} on topic $topic (${payload.toString().abbreviate(50)})" }
|
||||||
Fiber.unpark(it, QUASAR_UNBLOCKER)
|
iterateStateMachine(psm, payload, resumeAction)
|
||||||
} catch(e: Throwable) {
|
|
||||||
logError(e, obj, topic, it)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Clean this up
|
// TODO: Clean this up
|
||||||
open class FiberRequest(val topic: String, val destination: MessageRecipients?,
|
open class FiberRequest(val topic: String,
|
||||||
val sessionIDForSend: Long, val sessionIDForReceive: Long, val obj: Any?) {
|
val destination: MessageRecipients?,
|
||||||
|
val sessionIDForSend: Long,
|
||||||
|
val sessionIDForReceive: Long,
|
||||||
|
val payload: Any?) {
|
||||||
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
|
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
|
||||||
// don't have the original stack trace because it's in a suspended fiber.
|
// don't have the original stack trace because it's in a suspended fiber.
|
||||||
val stackTraceInCaseOfProblems = StackSnapshot()
|
val stackTraceInCaseOfProblems = StackSnapshot()
|
||||||
|
@ -28,11 +28,11 @@ import com.r3corda.node.services.persistence.StorageServiceImpl
|
|||||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||||
import com.r3corda.node.services.wallet.NodeWalletService
|
import com.r3corda.node.services.wallet.NodeWalletService
|
||||||
import com.r3corda.node.services.wallet.WalletImpl
|
import com.r3corda.node.services.wallet.WalletImpl
|
||||||
|
import com.r3corda.protocols.TwoPartyTradeProtocol
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
import org.junit.Before
|
import org.junit.Before
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import com.r3corda.protocols.TwoPartyTradeProtocol
|
|
||||||
import java.io.ByteArrayInputStream
|
import java.io.ByteArrayInputStream
|
||||||
import java.io.ByteArrayOutputStream
|
import java.io.ByteArrayOutputStream
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
@ -218,6 +218,9 @@ class TwoPartyTradeProtocolTests {
|
|||||||
assertEquals(bobFuture.get(), aliceFuture.get())
|
assertEquals(bobFuture.get(), aliceFuture.get())
|
||||||
|
|
||||||
assertThat(bobNode.smm.findStateMachines(TwoPartyTradeProtocol.Buyer::class.java)).isEmpty()
|
assertThat(bobNode.smm.findStateMachines(TwoPartyTradeProtocol.Buyer::class.java)).isEmpty()
|
||||||
|
|
||||||
|
assertThat(bobNode.checkpointStorage.checkpoints).isEmpty()
|
||||||
|
assertThat(aliceNode.checkpointStorage.checkpoints).isEmpty()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,6 +93,6 @@ class PerFileCheckpointStorageTests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private var checkpointCount = 1
|
private var checkpointCount = 1
|
||||||
private fun newCheckpoint() = Checkpoint(SerializedBytes(Ints.toByteArray(checkpointCount++)), "topic", "javaType")
|
private fun newCheckpoint() = Checkpoint(SerializedBytes(Ints.toByteArray(checkpointCount++)), "topic", "javaType", null)
|
||||||
|
|
||||||
}
|
}
|
@ -24,7 +24,6 @@ import com.r3corda.node.services.config.NodeConfigurationFromConfig
|
|||||||
import com.r3corda.node.services.messaging.ArtemisMessagingService
|
import com.r3corda.node.services.messaging.ArtemisMessagingService
|
||||||
import com.r3corda.node.services.network.NetworkMapService
|
import com.r3corda.node.services.network.NetworkMapService
|
||||||
import com.r3corda.node.services.persistence.NodeAttachmentService
|
import com.r3corda.node.services.persistence.NodeAttachmentService
|
||||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
|
||||||
import com.r3corda.node.services.transactions.NotaryService
|
import com.r3corda.node.services.transactions.NotaryService
|
||||||
import com.r3corda.node.services.wallet.NodeWalletService
|
import com.r3corda.node.services.wallet.NodeWalletService
|
||||||
import com.r3corda.node.utilities.ANSIProgressRenderer
|
import com.r3corda.node.utilities.ANSIProgressRenderer
|
||||||
@ -128,9 +127,6 @@ fun main(args: Array<String>) {
|
|||||||
NodeInfo(ArtemisMessagingService.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type))
|
NodeInfo(ArtemisMessagingService.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Remove this once checkpoint resume works.
|
|
||||||
StateMachineManager.restoreCheckpointsOnStart = false
|
|
||||||
|
|
||||||
// And now construct then start the node object. It takes a little while.
|
// And now construct then start the node object. It takes a little while.
|
||||||
val node = logElapsedTime("Node startup") {
|
val node = logElapsedTime("Node startup") {
|
||||||
Node(directory, myNetAddr, config, networkMapId, advertisedServices).start()
|
Node(directory, myNetAddr, config, networkMapId, advertisedServices).start()
|
||||||
@ -175,10 +171,18 @@ fun runSeller(myNetAddr: HostAndPort, node: Node, theirNetAddr: HostAndPort) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (node.isPreviousCheckpointsPresent) {
|
||||||
|
node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).forEach {
|
||||||
|
ANSIProgressRenderer.progressTracker = it.first.progressTracker
|
||||||
|
it.second.get()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
val otherSide = ArtemisMessagingService.makeRecipient(theirNetAddr)
|
val otherSide = ArtemisMessagingService.makeRecipient(theirNetAddr)
|
||||||
val seller = TraderDemoProtocolSeller(myNetAddr, otherSide)
|
val seller = TraderDemoProtocolSeller(myNetAddr, otherSide)
|
||||||
ANSIProgressRenderer.progressTracker = seller.progressTracker
|
ANSIProgressRenderer.progressTracker = seller.progressTracker
|
||||||
node.smm.add("demo.seller", seller).get()
|
node.smm.add("demo.seller", seller).get()
|
||||||
|
}
|
||||||
|
|
||||||
node.stop()
|
node.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,11 +194,18 @@ fun runBuyer(node: Node) {
|
|||||||
it.storePath
|
it.storePath
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val future = if (node.isPreviousCheckpointsPresent) {
|
||||||
|
val (buyer, future) = node.smm.findStateMachines(TraderDemoProtocolBuyer::class.java).single()
|
||||||
|
ANSIProgressRenderer.progressTracker = buyer.progressTracker //TODO the SMM will soon be able to wire up the ANSIProgressRenderer automatially
|
||||||
|
future
|
||||||
|
} else {
|
||||||
// We use a simple scenario-specific wrapper protocol to make things happen.
|
// We use a simple scenario-specific wrapper protocol to make things happen.
|
||||||
val buyer = TraderDemoProtocolBuyer(attachmentsPath, node.info.identity)
|
val buyer = TraderDemoProtocolBuyer(attachmentsPath, node.info.identity)
|
||||||
ANSIProgressRenderer.progressTracker = buyer.progressTracker
|
ANSIProgressRenderer.progressTracker = buyer.progressTracker
|
||||||
// This thread will halt forever here.
|
node.smm.add("demo.buyer", buyer)
|
||||||
node.smm.add("demo.buyer", buyer).get()
|
}
|
||||||
|
|
||||||
|
future.get() // This thread will halt forever here.
|
||||||
}
|
}
|
||||||
|
|
||||||
// We create a couple of ad-hoc test protocols that wrap the two party trade protocol, to give us the demo logic.
|
// We create a couple of ad-hoc test protocols that wrap the two party trade protocol, to give us the demo logic.
|
||||||
|
Reference in New Issue
Block a user