Refactor the protocol framework out into separate files.

This commit is contained in:
Mike Hearn 2016-02-19 16:08:46 +01:00
parent 9b28521389
commit 9de104df7b
10 changed files with 229 additions and 182 deletions

View File

@ -11,9 +11,9 @@ package contracts.protocols
import co.paralleluniverse.fibers.Suspendable
import core.SignedTransaction
import core.crypto.SecureHash
import core.messaging.ProtocolLogic
import core.protocols.ProtocolLogic
import core.messaging.SingleMessageRecipient
import core.messaging.UntrustworthyData
import core.utilities.UntrustworthyData
import core.node.DataVendingService
import core.random63BitValue
import java.util.*

View File

@ -14,7 +14,7 @@ import core.SignedTransaction
import core.TransactionGroup
import core.WireTransaction
import core.crypto.SecureHash
import core.messaging.ProtocolLogic
import core.protocols.ProtocolLogic
import core.messaging.SingleMessageRecipient
import java.util.*

View File

@ -16,7 +16,7 @@ import core.*
import core.crypto.DigitalSignature
import core.crypto.signWithECDSA
import core.messaging.LegallyIdentifiableNode
import core.messaging.ProtocolLogic
import core.protocols.ProtocolLogic
import core.messaging.SingleMessageRecipient
import core.messaging.StateMachineManager
import core.node.TimestampingProtocol

View File

@ -10,17 +10,16 @@ package core.messaging
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.google.common.base.Throwables
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import core.ServiceHub
import core.crypto.SecureHash
import core.crypto.sha256
import core.protocols.ProtocolLogic
import core.protocols.ProtocolStateMachine
import core.serialization.THREAD_LOCAL_KRYO
import core.serialization.createKryo
import core.serialization.deserialize
@ -28,7 +27,6 @@ import core.serialization.serialize
import core.utilities.trace
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream
import java.io.PrintWriter
import java.io.StringWriter
import java.util.*
@ -232,181 +230,22 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
}
}
}
}
object SameThreadFiberScheduler : FiberExecutorScheduler("Same thread scheduler", MoreExecutors.directExecutor())
object SameThreadFiberScheduler : FiberExecutorScheduler("Same thread scheduler", MoreExecutors.directExecutor())
/**
* A sub-class of [ProtocolLogic<T>] implements a protocol flow using direct, straight line blocking code. Thus you
* can write complex protocol logic in an ordinary fashion, without having to think about callbacks, restarting after
* a node crash, how many instances of your protocol there are running and so on.
*
* Invoking the network will cause the call stack to be suspended onto the heap and then serialized to a database using
* the Quasar fibers framework. Because of this, if you need access to data that might change over time, you should
* request it just-in-time via the [serviceHub] property which is provided. Don't try and keep data you got from a
* service across calls to send/receive/sendAndReceive because the world might change in arbitrary ways out from
* underneath you, for instance, if the node is restarted or reconfigured!
*
* Additionally, be aware of what data you pin either via the stack or in your [ProtocolLogic] implementation. Very large
* objects or datasets will hurt performance by increasing the amount of data stored in each checkpoint.
*
* If you'd like to use another ProtocolLogic class as a component of your own, construct it on the fly and then pass
* it to the [subProtocol] method. It will return the result of that protocol when it completes.
*/
abstract class ProtocolLogic<T> {
/** Reference to the [Fiber] instance that is the top level controller for the entire flow. */
lateinit var psm: ProtocolStateMachine<*>
// TODO: Clean this up
open class FiberRequest(val topic: String, val destination: MessageRecipients?,
val sessionIDForSend: Long, val sessionIDForReceive: Long, val obj: Any?) {
class ExpectingResponse<R : Any>(
topic: String,
destination: MessageRecipients?,
sessionIDForSend: Long,
sessionIDForReceive: Long,
obj: Any?,
val responseType: Class<R>
) : FiberRequest(topic, destination, sessionIDForSend, sessionIDForReceive, obj)
/** This is where you should log things to. */
val logger: Logger get() = psm.logger
/** Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts */
val serviceHub: ServiceHub get() = psm.serviceHub
// Kotlin helpers that allow the use of generic types.
inline fun <reified T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long,
sessionIDForReceive: Long, obj: Any): UntrustworthyData<T> {
return psm.sendAndReceive(topic, destination, sessionIDForSend, sessionIDForReceive, obj, T::class.java)
}
inline fun <reified T : Any> receive(topic: String, sessionIDForReceive: Long): UntrustworthyData<T> {
return psm.receive(topic, sessionIDForReceive, T::class.java)
}
@Suspendable fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) {
psm.send(topic, destination, sessionID, obj)
}
/**
* Invokes the given subprotocol by simply passing through this [ProtocolLogic]s reference to the
* [ProtocolStateMachine] and then calling the [call] method.
*/
@Suspendable fun <R> subProtocol(subLogic: ProtocolLogic<R>): R {
subLogic.psm = psm
return subLogic.call()
}
@Suspendable
abstract fun call(): T
}
/**
* A ProtocolStateMachine instance is a suspendable fiber that delegates all actual logic to a [ProtocolLogic] instance.
* For any given flow there is only one PSM, even if that protocol invokes subprotocols.
*
* These classes are created by the [StateMachineManager] when a new protocol is started at the topmost level. If
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
* logic element gets to return the value that the entire state machine resolves to.
*/
class ProtocolStateMachine<R>(val logic: ProtocolLogic<R>) : Fiber<R>("protocol", SameThreadFiberScheduler) {
// These fields shouldn't be serialised, so they are marked @Transient.
@Transient private var suspendFunc: ((result: FiberRequest, serFiber: ByteArray) -> Unit)? = null
@Transient private var resumeWithObject: Any? = null
@Transient lateinit var serviceHub: ServiceHub
@Transient lateinit var logger: Logger
init {
logic.psm = this
}
fun prepareForResumeWith(serviceHub: ServiceHub, withObject: Any?, logger: Logger,
suspendFunc: (FiberRequest, ByteArray) -> Unit) {
this.suspendFunc = suspendFunc
this.logger = logger
this.resumeWithObject = withObject
this.serviceHub = serviceHub
}
@Transient private var _resultFuture: SettableFuture<R>? = SettableFuture.create<R>()
/** This future will complete when the call method returns. */
val resultFuture: ListenableFuture<R> get() {
return _resultFuture ?: run {
val f = SettableFuture.create<R>()
_resultFuture = f
return f
}
}
@Suspendable @Suppress("UNCHECKED_CAST")
override fun run(): R {
try {
val result = logic.call()
if (result != null)
_resultFuture?.set(result)
return result
} catch (e: Throwable) {
_resultFuture?.setException(e)
throw e
}
}
@Suspendable @Suppress("UNCHECKED_CAST")
private fun <T : Any> suspendAndExpectReceive(with: FiberRequest): UntrustworthyData<T> {
Fiber.parkAndSerialize { fiber, serializer ->
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
val deserializer = Fiber.getFiberSerializer() as KryoSerializer
val kryo = createKryo(deserializer.kryo)
val stream = ByteArrayOutputStream()
Output(stream).use {
kryo.writeClassAndObject(it, this)
}
suspendFunc!!(with, stream.toByteArray())
}
val tmp = resumeWithObject ?: throw IllegalStateException("Expected to receive something")
resumeWithObject = null
return UntrustworthyData(tmp as T)
}
@Suspendable @Suppress("UNCHECKED_CAST")
fun <T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long, sessionIDForReceive: Long,
obj: Any, recvType: Class<T>): UntrustworthyData<T> {
val result = FiberRequest.ExpectingResponse(topic, destination, sessionIDForSend, sessionIDForReceive, obj, recvType)
return suspendAndExpectReceive(result)
}
@Suspendable
fun <T : Any> receive(topic: String, sessionIDForReceive: Long, recvType: Class<T>): UntrustworthyData<T> {
val result = FiberRequest.ExpectingResponse(topic, null, -1, sessionIDForReceive, null, recvType)
return suspendAndExpectReceive(result)
}
@Suspendable
fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) {
val result = FiberRequest.NotExpectingResponse(topic, destination, sessionID, obj)
Fiber.parkAndSerialize { fiber, writer -> suspendFunc!!(result, writer.write(fiber)) }
}
}
/**
* A small utility to approximate taint tracking: if a method gives you back one of these, it means the data came from
* a remote source that may be incentivised to pass us junk that violates basic assumptions and thus must be checked
* first. The wrapper helps you to avoid forgetting this vital step. Things you might want to check are:
*
* - Is this object the one you actually expected? Did the other side hand you back something technically valid but
* not what you asked for?
* - Is the object disobeying its own invariants?
* - Are any objects *reachable* from this object mismatched or not what you expected?
* - Is it suspiciously large or small?
*/
class UntrustworthyData<T>(private val fromUntrustedWorld: T) {
val data: T
@Deprecated("Accessing the untrustworthy data directly without validating it first is a bad idea")
get() = fromUntrustedWorld
@Suppress("DEPRECATION")
inline fun <R> validate(validator: (T) -> R) = validator(data)
}
// TODO: Clean this up
open class FiberRequest(val topic: String, val destination: MessageRecipients?,
val sessionIDForSend: Long, val sessionIDForReceive: Long, val obj: Any?) {
class ExpectingResponse<R : Any>(
topic: String,
destination: MessageRecipients?,
sessionIDForSend: Long,
sessionIDForReceive: Long,
obj: Any?,
val responseType: Class<R>
) : FiberRequest(topic, destination, sessionIDForSend, sessionIDForReceive, obj)
class NotExpectingResponse(topic: String, destination: MessageRecipients, sessionIDForSend: Long, obj: Any?)
class NotExpectingResponse(topic: String, destination: MessageRecipients, sessionIDForSend: Long, obj: Any?)
: FiberRequest(topic, destination, sessionIDForSend, -1, obj)
}
}

View File

@ -14,6 +14,7 @@ import core.*
import core.crypto.DigitalSignature
import core.crypto.signWithECDSA
import core.messaging.*
import core.protocols.ProtocolLogic
import core.serialization.SerializedBytes
import core.serialization.deserialize
import core.serialization.serialize

View File

@ -15,7 +15,7 @@ import contracts.protocols.TwoPartyTradeProtocol
import core.*
import core.crypto.generateKeyPair
import core.messaging.LegallyIdentifiableNode
import core.messaging.ProtocolLogic
import core.protocols.ProtocolLogic
import core.messaging.SingleMessageRecipient
import core.serialization.deserialize
import core.utilities.BriefLogFormatter

View File

@ -0,0 +1,66 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.protocols
import co.paralleluniverse.fibers.Suspendable
import core.ServiceHub
import core.messaging.MessageRecipients
import core.utilities.UntrustworthyData
import org.slf4j.Logger
/**
* A sub-class of [ProtocolLogic<T>] implements a protocol flow using direct, straight line blocking code. Thus you
* can write complex protocol logic in an ordinary fashion, without having to think about callbacks, restarting after
* a node crash, how many instances of your protocol there are running and so on.
*
* Invoking the network will cause the call stack to be suspended onto the heap and then serialized to a database using
* the Quasar fibers framework. Because of this, if you need access to data that might change over time, you should
* request it just-in-time via the [serviceHub] property which is provided. Don't try and keep data you got from a
* service across calls to send/receive/sendAndReceive because the world might change in arbitrary ways out from
* underneath you, for instance, if the node is restarted or reconfigured!
*
* Additionally, be aware of what data you pin either via the stack or in your [ProtocolLogic] implementation. Very large
* objects or datasets will hurt performance by increasing the amount of data stored in each checkpoint.
*
* If you'd like to use another ProtocolLogic class as a component of your own, construct it on the fly and then pass
* it to the [subProtocol] method. It will return the result of that protocol when it completes.
*/
abstract class ProtocolLogic<T> {
/** Reference to the [Fiber] instance that is the top level controller for the entire flow. */
lateinit var psm: ProtocolStateMachine<*>
/** This is where you should log things to. */
val logger: Logger get() = psm.logger
/** Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts */
val serviceHub: ServiceHub get() = psm.serviceHub
// Kotlin helpers that allow the use of generic types.
inline fun <reified T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long,
sessionIDForReceive: Long, obj: Any): UntrustworthyData<T> {
return psm.sendAndReceive(topic, destination, sessionIDForSend, sessionIDForReceive, obj, T::class.java)
}
inline fun <reified T : Any> receive(topic: String, sessionIDForReceive: Long): UntrustworthyData<T> {
return psm.receive(topic, sessionIDForReceive, T::class.java)
}
@Suspendable fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) {
psm.send(topic, destination, sessionID, obj)
}
/**
* Invokes the given subprotocol by simply passing through this [ProtocolLogic]s reference to the
* [ProtocolStateMachine] and then calling the [call] method.
*/
@Suspendable fun <R> subProtocol(subLogic: ProtocolLogic<R>): R {
subLogic.psm = psm
return subLogic.call()
}
@Suspendable
abstract fun call(): T
}

View File

@ -0,0 +1,111 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.protocols
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.esotericsoftware.kryo.io.Output
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import core.ServiceHub
import core.messaging.MessageRecipients
import core.messaging.StateMachineManager
import core.serialization.createKryo
import core.utilities.UntrustworthyData
import org.slf4j.Logger
import java.io.ByteArrayOutputStream
/**
* A ProtocolStateMachine instance is a suspendable fiber that delegates all actual logic to a [ProtocolLogic] instance.
* For any given flow there is only one PSM, even if that protocol invokes subprotocols.
*
* These classes are created by the [StateMachineManager] when a new protocol is started at the topmost level. If
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
* logic element gets to return the value that the entire state machine resolves to.
*/
class ProtocolStateMachine<R>(val logic: ProtocolLogic<R>) : Fiber<R>("protocol", StateMachineManager.SameThreadFiberScheduler) {
// These fields shouldn't be serialised, so they are marked @Transient.
@Transient private var suspendFunc: ((result: StateMachineManager.FiberRequest, serFiber: ByteArray) -> Unit)? = null
@Transient private var resumeWithObject: Any? = null
@Transient lateinit var serviceHub: ServiceHub
@Transient lateinit var logger: Logger
init {
logic.psm = this
}
fun prepareForResumeWith(serviceHub: ServiceHub, withObject: Any?, logger: Logger,
suspendFunc: (StateMachineManager.FiberRequest, ByteArray) -> Unit) {
this.suspendFunc = suspendFunc
this.logger = logger
this.resumeWithObject = withObject
this.serviceHub = serviceHub
}
@Transient private var _resultFuture: SettableFuture<R>? = SettableFuture.create<R>()
/** This future will complete when the call method returns. */
val resultFuture: ListenableFuture<R> get() {
return _resultFuture ?: run {
val f = SettableFuture.create<R>()
_resultFuture = f
return f
}
}
@Suspendable @Suppress("UNCHECKED_CAST")
override fun run(): R {
try {
val result = logic.call()
if (result != null)
_resultFuture?.set(result)
return result
} catch (e: Throwable) {
_resultFuture?.setException(e)
throw e
}
}
@Suspendable @Suppress("UNCHECKED_CAST")
private fun <T : Any> suspendAndExpectReceive(with: StateMachineManager.FiberRequest): UntrustworthyData<T> {
parkAndSerialize { fiber, serializer ->
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
val deserializer = getFiberSerializer() as KryoSerializer
val kryo = createKryo(deserializer.kryo)
val stream = ByteArrayOutputStream()
Output(stream).use {
kryo.writeClassAndObject(it, this)
}
suspendFunc!!(with, stream.toByteArray())
}
val tmp = resumeWithObject ?: throw IllegalStateException("Expected to receive something")
resumeWithObject = null
return UntrustworthyData(tmp as T)
}
@Suspendable @Suppress("UNCHECKED_CAST")
fun <T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long, sessionIDForReceive: Long,
obj: Any, recvType: Class<T>): UntrustworthyData<T> {
val result = StateMachineManager.FiberRequest.ExpectingResponse(topic, destination, sessionIDForSend, sessionIDForReceive, obj, recvType)
return suspendAndExpectReceive(result)
}
@Suspendable
fun <T : Any> receive(topic: String, sessionIDForReceive: Long, recvType: Class<T>): UntrustworthyData<T> {
val result = StateMachineManager.FiberRequest.ExpectingResponse(topic, null, -1, sessionIDForReceive, null, recvType)
return suspendAndExpectReceive(result)
}
@Suspendable
fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) {
val result = StateMachineManager.FiberRequest.NotExpectingResponse(topic, destination, sessionID, obj)
parkAndSerialize { fiber, writer -> suspendFunc!!(result, writer.write(fiber)) }
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.utilities
/**
* A small utility to approximate taint tracking: if a method gives you back one of these, it means the data came from
* a remote source that may be incentivised to pass us junk that violates basic assumptions and thus must be checked
* first. The wrapper helps you to avoid forgetting this vital step. Things you might want to check are:
*
* - Is this object the one you actually expected? Did the other side hand you back something technically valid but
* not what you asked for?
* - Is the object disobeying its own invariants?
* - Are any objects *reachable* from this object mismatched or not what you expected?
* - Is it suspiciously large or small?
*/
class UntrustworthyData<T>(private val fromUntrustedWorld: T) {
val data: T
@Deprecated("Accessing the untrustworthy data directly without validating it first is a bad idea")
get() = fromUntrustedWorld
@Suppress("DEPRECATION")
inline fun <R> validate(validator: (T) -> R) = validator(data)
}

View File

@ -12,6 +12,7 @@ import co.paralleluniverse.fibers.Suspendable
import core.*
import core.crypto.SecureHash
import core.messaging.*
import core.protocols.ProtocolLogic
import core.serialization.serialize
import core.testutils.ALICE
import core.testutils.ALICE_KEY