mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
Switch from JavaFlow to Quasar to provide fibers.
Quasar is a more modern, better maintained and more powerful framework. The main improvement is that this lets us avoid the ClassLoader tricks that JavaFlow was requiring, by using an agent. This introduces a requirement to mark methods that might be on a suspended stack as @Suspendable, but means that code interops cleanly. In Java 9 it is hoped that the marking requirement may even go away entirely.
This commit is contained in:
parent
018825d7d7
commit
412212a860
68
build.gradle
68
build.gradle
@ -3,7 +3,7 @@ version '1.0-SNAPSHOT'
|
||||
|
||||
apply plugin: 'java'
|
||||
apply plugin: 'kotlin'
|
||||
apply plugin: 'org.jetbrains.dokka'
|
||||
//apply plugin: 'org.jetbrains.dokka'
|
||||
|
||||
allprojects {
|
||||
sourceCompatibility = 1.8
|
||||
@ -12,6 +12,9 @@ allprojects {
|
||||
|
||||
buildscript {
|
||||
ext.kotlin_version = '1.0.0-beta-4584'
|
||||
ext.quasar_version = '0.7.4-SNAPSHOT'
|
||||
ext.asm_version = '0.5.3'
|
||||
|
||||
repositories {
|
||||
mavenCentral()
|
||||
jcenter()
|
||||
@ -22,7 +25,9 @@ buildscript {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
repositories {
|
||||
mavenLocal()
|
||||
mavenCentral()
|
||||
maven {
|
||||
url 'http://oss.sonatype.org/content/repositories/snapshots'
|
||||
@ -30,38 +35,61 @@ repositories {
|
||||
jcenter()
|
||||
}
|
||||
|
||||
configurations {
|
||||
quasar
|
||||
}
|
||||
|
||||
// This block makes Gradle print an error and refuse to continue if we get multiple versions of the same library
|
||||
// included simultaneously.
|
||||
configurations.all() {
|
||||
resolutionStrategy {
|
||||
failOnVersionConflict()
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
testCompile 'junit:junit:4.11'
|
||||
testCompile 'junit:junit:4.12'
|
||||
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
|
||||
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
|
||||
compile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
|
||||
compile "com.google.guava:guava:18.0"
|
||||
compile "com.esotericsoftware:kryo:3.0.3"
|
||||
compile("com.esotericsoftware:kryo:3.0.3") {
|
||||
force = true
|
||||
}
|
||||
compile "de.javakaffee:kryo-serializers:0.37"
|
||||
compile "com.google.code.findbugs:jsr305:3.0.1"
|
||||
|
||||
// Logging
|
||||
compile "org.slf4j:slf4j-jdk14:1.7.13"
|
||||
|
||||
// For the continuations in the state machine tests. Note: JavaFlow is old and unmaintained but still seems to work
|
||||
// just fine, once the patch here is applied to update it to a Java8 compatible asm:
|
||||
//
|
||||
// https://github.com/playframework/play1/commit/e0e28e6780a48c000e7ed536962f1f284cef9437
|
||||
//
|
||||
// Obviously using this year-old upload to Maven Central by the Maven Play Plugin team is a short term hack for
|
||||
// experimenting. Using this for real would mean forking JavaFlow and taking over maintenance (luckily it's small
|
||||
// and Java is stable, so this is unlikely to be a big burden). We have to manually force an in-place upgrade to
|
||||
// asm 5.0.3 here (javaflow wants 5.0.2) in order to avoid version conflicts. This is also something that should be
|
||||
// fixed in any fork. Sadly, even Jigsaw doesn't solve this problem out of the box.
|
||||
compile "com.google.code.maven-play-plugin.org.apache.commons:commons-javaflow:1590792-patched-play-1.3.0"
|
||||
compile "org.ow2.asm:asm:5.0.3"
|
||||
compile "org.ow2.asm:asm-analysis:5.0.3"
|
||||
compile "org.ow2.asm:asm-tree:5.0.3"
|
||||
compile "org.ow2.asm:asm-commons:5.0.3"
|
||||
compile "org.ow2.asm:asm-util:5.0.3"
|
||||
// Quasar: for the bytecode rewriting for state machines.
|
||||
compile("co.paralleluniverse:quasar-core:${quasar_version}:jdk8") {
|
||||
// Quasar currently depends on an old version of Kryo, but it works fine with the newer version, so exclude it
|
||||
// here so the newer version is picked up.
|
||||
exclude group: "com.esotericsoftware.kryo", module: "kryo"
|
||||
}
|
||||
quasar("co.paralleluniverse:quasar-core:${quasar_version}:jdk8@jar") {
|
||||
exclude group: "com.esotericsoftware.kryo", module: "kryo"
|
||||
}
|
||||
|
||||
// For visualisation
|
||||
compile "org.graphstream:gs-core:1.3"
|
||||
compile "org.graphstream:gs-ui:1.3"
|
||||
compile "com.intellij:forms_rt:7.0.3"
|
||||
compile("com.intellij:forms_rt:7.0.3") {
|
||||
exclude group: "asm"
|
||||
}
|
||||
}
|
||||
|
||||
// These lines tell Gradle to add a couple of JVM command line arguments to unit test and program runs, which set up
|
||||
// the Quasar bytecode rewriting system so fibers can be suspended. The verifyInstrumentation line makes things run
|
||||
// slower but you get a much better error message if you forget to annotate a method with @Suspendable that needs it.
|
||||
//
|
||||
// In Java 9 (hopefully) the requirement to annotate methods as @Suspendable will go away.
|
||||
tasks.withType(Test) {
|
||||
jvmArgs "-javaagent:${configurations.quasar.singleFile}"
|
||||
jvmArgs "-Dco.paralleluniverse.fibers.verifyInstrumentation"
|
||||
}
|
||||
tasks.withType(JavaExec) {
|
||||
jvmArgs "-javaagent:${configurations.quasar.singleFile}"
|
||||
jvmArgs "-Dco.paralleluniverse.fibers.verifyInstrumentation"
|
||||
}
|
||||
|
@ -8,15 +8,20 @@
|
||||
|
||||
package contracts.protocols
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import contracts.Cash
|
||||
import contracts.sumCashBy
|
||||
import core.*
|
||||
import core.messaging.*
|
||||
import core.messaging.ProtocolStateMachine
|
||||
import core.messaging.SingleMessageRecipient
|
||||
import core.messaging.StateMachineManager
|
||||
import core.serialization.deserialize
|
||||
import core.utilities.trace
|
||||
import java.security.KeyPair
|
||||
import java.security.PublicKey
|
||||
|
||||
// TODO: Get rid of the "initial args" concept and just use the class c'tors, now we are using Quasar.
|
||||
|
||||
/**
|
||||
* This asset trading protocol has two parties (B and S for buyer and seller) and the following steps:
|
||||
*
|
||||
@ -89,6 +94,7 @@ private class TwoPartyTradeProtocolImpl(private val smm: StateMachineManager) :
|
||||
// the continuation by the state machine framework. Please refer to the documentation website (docs/build/html) to
|
||||
// learn more about the protocol state machine framework.
|
||||
class SellerImpl : Seller() {
|
||||
@Suspendable
|
||||
override fun call(args: SellerInitialArgs): Pair<WireTransaction, LedgerTransaction> {
|
||||
val sessionID = random63BitValue()
|
||||
|
||||
@ -96,7 +102,7 @@ private class TwoPartyTradeProtocolImpl(private val smm: StateMachineManager) :
|
||||
val hello = SellerTradeInfo(args.assetToSell, args.price, args.myKeyPair.public, sessionID)
|
||||
|
||||
val partialTX = sendAndReceive<SignedWireTransaction>(TRADE_TOPIC, args.buyerSessionID, sessionID, hello)
|
||||
logger().trace { "Received partially signed transaction" }
|
||||
logger.trace { "Received partially signed transaction" }
|
||||
|
||||
partialTX.verifySignatures()
|
||||
val wtx: WireTransaction = partialTX.txBits.deserialize()
|
||||
@ -120,7 +126,7 @@ private class TwoPartyTradeProtocolImpl(private val smm: StateMachineManager) :
|
||||
val fullySigned: SignedWireTransaction = partialTX.copy(sigs = partialTX.sigs + ourSignature)
|
||||
// We should run it through our full TransactionGroup of all transactions here.
|
||||
fullySigned.verify()
|
||||
logger().trace { "Built finished transaction, sending back to secondary!" }
|
||||
logger.trace { "Built finished transaction, sending back to secondary!" }
|
||||
|
||||
send(TRADE_TOPIC, args.buyerSessionID, fullySigned)
|
||||
|
||||
@ -135,13 +141,14 @@ private class TwoPartyTradeProtocolImpl(private val smm: StateMachineManager) :
|
||||
|
||||
// The buyer's side of the protocol. See note above Seller to learn about the caveats here.
|
||||
class BuyerImpl : Buyer() {
|
||||
@Suspendable
|
||||
override fun call(args: BuyerInitialArgs): Pair<WireTransaction, LedgerTransaction> {
|
||||
// Wait for a trade request to come in on our pre-provided session ID.
|
||||
val tradeRequest = receive<SellerTradeInfo>(TRADE_TOPIC, args.sessionID)
|
||||
|
||||
// What is the seller trying to sell us?
|
||||
val assetTypeName = tradeRequest.assetForSale.state.javaClass.name
|
||||
logger().trace { "Got trade request for a $assetTypeName" }
|
||||
logger.trace { "Got trade request for a $assetTypeName" }
|
||||
|
||||
// Check the start message for acceptability.
|
||||
check(tradeRequest.sessionID > 0)
|
||||
@ -179,17 +186,17 @@ private class TwoPartyTradeProtocolImpl(private val smm: StateMachineManager) :
|
||||
|
||||
// TODO: Could run verify() here to make sure the only signature missing is the sellers.
|
||||
|
||||
logger().trace { "Sending partially signed transaction to seller" }
|
||||
logger.trace { "Sending partially signed transaction to seller" }
|
||||
|
||||
// TODO: Protect against the buyer terminating here and leaving us in the lurch without the final tx.
|
||||
// TODO: Protect against a malicious buyer sending us back a different transaction to the one we built.
|
||||
val fullySigned = sendAndReceive<SignedWireTransaction>(TRADE_TOPIC, tradeRequest.sessionID, args.sessionID, stx)
|
||||
|
||||
logger().trace { "Got fully signed transaction, verifying ... "}
|
||||
logger.trace { "Got fully signed transaction, verifying ... "}
|
||||
|
||||
val ltx = fullySigned.verifyToLedgerTransaction(serviceHub.identityService)
|
||||
|
||||
logger().trace { "Fully signed transaction was valid. Trade complete! :-)" }
|
||||
logger.trace { "Fully signed transaction was valid. Trade complete! :-)" }
|
||||
|
||||
return Pair(fullySigned.tx, ltx)
|
||||
}
|
||||
|
@ -8,8 +8,14 @@
|
||||
|
||||
package core.messaging
|
||||
|
||||
import co.paralleluniverse.fibers.Fiber
|
||||
import co.paralleluniverse.fibers.FiberExecutorScheduler
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import core.SecureHash
|
||||
import core.ServiceHub
|
||||
@ -17,14 +23,11 @@ import core.serialization.THREAD_LOCAL_KRYO
|
||||
import core.serialization.createKryo
|
||||
import core.serialization.deserialize
|
||||
import core.serialization.serialize
|
||||
import core.sha256
|
||||
import core.utilities.trace
|
||||
import core.whenComplete
|
||||
import org.apache.commons.javaflow.Continuation
|
||||
import org.apache.commons.javaflow.ContinuationClassLoader
|
||||
import org.objenesis.instantiator.ObjectInstantiator
|
||||
import org.objenesis.strategy.InstantiatorStrategy
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executor
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
@ -44,6 +47,7 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* TODO: This needs extension to the >2 party case.
|
||||
* TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised
|
||||
* continuation is always unique?
|
||||
* TODO: Think about how to bring the system to a clean stop so it can be upgraded without any serialised stacks on disk
|
||||
*/
|
||||
@ThreadSafe
|
||||
class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) {
|
||||
@ -61,13 +65,20 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
}
|
||||
}
|
||||
|
||||
// Used to work around a small limitation in Quasar.
|
||||
private val QUASAR_UNBLOCKER = run {
|
||||
val field = Fiber::class.java.getDeclaredField("SERIALIZER_BLOCKER")
|
||||
field.isAccessible = true
|
||||
field.get(null)
|
||||
}
|
||||
|
||||
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
|
||||
private class Checkpoint(
|
||||
val continuation: Continuation,
|
||||
val otherSide: MessageRecipients,
|
||||
val loggerName: String,
|
||||
val awaitingTopic: String,
|
||||
val awaitingObjectOfType: String // java class name
|
||||
val serialisedFiber: ByteArray,
|
||||
val otherSide: MessageRecipients,
|
||||
val loggerName: String,
|
||||
val awaitingTopic: String,
|
||||
val awaitingObjectOfType: String // java class name
|
||||
)
|
||||
|
||||
init {
|
||||
@ -77,50 +88,35 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
/** Reads the database map and resurrects any serialised state machines. */
|
||||
private fun restoreCheckpoints() {
|
||||
for (bytes in checkpointsMap.values) {
|
||||
val kryo = createKryo()
|
||||
|
||||
// Set up Kryo to use the JavaFlow classloader when deserialising, so the magical continuation bytecode
|
||||
// rewriting is performed correctly.
|
||||
var _psm: ProtocolStateMachine<*, *>? = null
|
||||
kryo.instantiatorStrategy = object : InstantiatorStrategy {
|
||||
val forwardingTo = kryo.instantiatorStrategy
|
||||
|
||||
override fun <T> newInstantiatorOf(type: Class<T>): ObjectInstantiator<T> {
|
||||
// If this is some object that isn't a state machine, use the default behaviour.
|
||||
if (!ProtocolStateMachine::class.java.isAssignableFrom(type))
|
||||
return forwardingTo.newInstantiatorOf(type)
|
||||
|
||||
// Otherwise, return an 'object instantiator' (i.e. factory) that uses the JavaFlow classloader.
|
||||
@Suppress("UNCHECKED_CAST", "CAST_NEVER_SUCCEEDS")
|
||||
return ObjectInstantiator<T> {
|
||||
val p = loadContinuationClass(type as Class<out ProtocolStateMachine<*, *>>).first
|
||||
// Pass the new object a pointer to the service hub where it can find objects that don't
|
||||
// survive restarts.
|
||||
p.serviceHub = serviceHub
|
||||
_psm = p
|
||||
p as T
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val checkpoint = bytes.deserialize<Checkpoint>(kryo)
|
||||
val continuation = checkpoint.continuation
|
||||
|
||||
// We know _psm can't be null here, because we always serialise a ProtocolStateMachine subclass, so the
|
||||
// code above that does "_psm = p" will always run. But the Kotlin compiler can't know that so we have to
|
||||
// forcibly cast away the nullness with the !! operator.
|
||||
val psm = _psm!!
|
||||
registerStateMachine(psm)
|
||||
val checkpoint = bytes.deserialize<Checkpoint>()
|
||||
val checkpointKey = SecureHash.sha256(bytes)
|
||||
|
||||
// Grab the Kryo engine configured by Quasar for its own stuff, and then do our own configuration on top
|
||||
// so we can deserialised the nested stream that holds the fiber.
|
||||
val psm = deserializeFiber(checkpoint.serialisedFiber)
|
||||
_stateMachines.add(psm)
|
||||
val logger = LoggerFactory.getLogger(checkpoint.loggerName)
|
||||
val awaitingObjectOfType = Class.forName(checkpoint.awaitingObjectOfType)
|
||||
val topic = checkpoint.awaitingTopic
|
||||
|
||||
// And now re-wire the deserialised continuation back up to the network service.
|
||||
setupNextMessageHandler(logger, serviceHub.networkService, continuation, checkpoint.otherSide,
|
||||
awaitingObjectOfType, checkpoint.awaitingTopic, bytes)
|
||||
serviceHub.networkService.runOnNextMessage(topic, runInThread) { netMsg ->
|
||||
val obj: Any = THREAD_LOCAL_KRYO.get().readObject(Input(netMsg.data), awaitingObjectOfType)
|
||||
logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
|
||||
iterateStateMachine(psm, serviceHub.networkService, logger, obj, checkpoint.otherSide, checkpointKey) {
|
||||
Fiber.unparkDeserialized(it, SameThreadFiberScheduler)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun deserializeFiber(bits: ByteArray): ProtocolStateMachine<*, *> {
|
||||
val deserializer = Fiber.getFiberSerializer() as KryoSerializer
|
||||
val kryo = createKryo(deserializer.kryo)
|
||||
val psm = kryo.readClassAndObject(Input(bits)) as ProtocolStateMachine<*, *>
|
||||
return psm
|
||||
}
|
||||
|
||||
/**
|
||||
* Kicks off a brand new state machine of the given class. It will send messages to the network node identified by
|
||||
* the [otherSide] parameter, log with the named logger, and the [initialArgs] object will be passed to the call
|
||||
@ -128,99 +124,84 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
* and will be removed once it completes.
|
||||
*/
|
||||
fun <T : ProtocolStateMachine<I, *>, I> add(otherSide: MessageRecipients, initialArgs: I, loggerName: String,
|
||||
continuationClass: Class<out T>): T {
|
||||
klass: Class<out T>): T {
|
||||
val logger = LoggerFactory.getLogger(loggerName)
|
||||
val (sm, continuation) = loadContinuationClass(continuationClass)
|
||||
sm.serviceHub = serviceHub
|
||||
registerStateMachine(sm)
|
||||
runInThread.execute {
|
||||
// The current state of the continuation is held in the closure attached to the messaging system whenever
|
||||
// the continuation suspends and tells us it expects a response.
|
||||
iterateStateMachine(continuation, serviceHub.networkService, otherSide, initialArgs, logger, null)
|
||||
val fiber = klass.newInstance()
|
||||
iterateStateMachine(fiber, serviceHub.networkService, logger, initialArgs, otherSide, null) {
|
||||
it.start()
|
||||
}
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return sm as T
|
||||
return fiber
|
||||
}
|
||||
|
||||
private fun registerStateMachine(psm: ProtocolStateMachine<*, *>) {
|
||||
_stateMachines.add(psm)
|
||||
psm.resultFuture.whenComplete(runInThread) {
|
||||
_stateMachines.remove(psm)
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
private fun loadContinuationClass(continuationClass: Class<out ProtocolStateMachine<*, *>>): Pair<ProtocolStateMachine<*, *>, Continuation> {
|
||||
val url = continuationClass.protectionDomain.codeSource.location
|
||||
val cl = ContinuationClassLoader(arrayOf(url), this.javaClass.classLoader)
|
||||
val obj = cl.forceLoadClass(continuationClass.name).newInstance() as ProtocolStateMachine<*, *>
|
||||
return Pair(obj, Continuation.startSuspendedWith(obj))
|
||||
}
|
||||
|
||||
private fun persistCheckpoint(prev: ByteArray?, new: ByteArray) {
|
||||
private fun persistCheckpoint(prevCheckpointKey: SecureHash?, new: ByteArray): SecureHash {
|
||||
// It's OK for this to be unsynchronised, as the prev/new byte arrays are specific to a continuation instance,
|
||||
// and the underlying map provided by the database layer is expected to be thread safe.
|
||||
if (prev != null)
|
||||
checkpointsMap.remove(SecureHash.sha256(prev))
|
||||
checkpointsMap[SecureHash.sha256(new)] = new
|
||||
if (prevCheckpointKey != null)
|
||||
checkpointsMap.remove(prevCheckpointKey)
|
||||
val key = SecureHash.sha256(new)
|
||||
checkpointsMap[key] = new
|
||||
return key
|
||||
}
|
||||
|
||||
private fun iterateStateMachine(c: Continuation, net: MessagingService, otherSide: MessageRecipients,
|
||||
continuationInput: Any?, logger: Logger,
|
||||
prevPersistedBytes: ByteArray?): Continuation {
|
||||
// This will resume execution of the run() function inside the continuation at the place it left off.
|
||||
val oldLogger = CONTINUATION_LOGGER.get()
|
||||
val nextState: Continuation? = try {
|
||||
CONTINUATION_LOGGER.set(logger)
|
||||
Continuation.continueWith(c, continuationInput)
|
||||
private fun iterateStateMachine(psm: ProtocolStateMachine<*, *>, net: MessagingService, logger: Logger,
|
||||
obj: Any?, otherSide: MessageRecipients, prevCheckpointKey: SecureHash?,
|
||||
resumeFunc: (ProtocolStateMachine<*, *>) -> Unit) {
|
||||
val onSuspend = fun(request: FiberRequest, serFiber: ByteArray) {
|
||||
// We have a request to do something: send, receive, or send-and-receive.
|
||||
if (request is FiberRequest.ExpectingResponse<*>) {
|
||||
// Prepare a listener on the network that runs in the background thread when we received a message.
|
||||
checkpointAndSetupMessageHandler(logger, net, psm, otherSide, request.responseType,
|
||||
"${request.topic}.${request.sessionIDForReceive}", prevCheckpointKey, serFiber)
|
||||
}
|
||||
// If an object to send was provided (not null), send it now.
|
||||
request.obj?.let {
|
||||
val topic = "${request.topic}.${request.sessionIDForSend}"
|
||||
logger.trace { "-> $topic : message of type ${it.javaClass.name}" }
|
||||
net.send(net.createMessage(topic, it.serialize().bits), otherSide)
|
||||
}
|
||||
if (request is FiberRequest.NotExpectingResponse) {
|
||||
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
||||
iterateStateMachine(psm, net, logger, null, otherSide, prevCheckpointKey) {
|
||||
Fiber.unpark(it, QUASAR_UNBLOCKER)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
psm.prepareForResumeWith(serviceHub, obj, logger, onSuspend)
|
||||
|
||||
try {
|
||||
// Now either start or carry on with the protocol from where it left off (or at the start).
|
||||
resumeFunc(psm)
|
||||
|
||||
// We're back! Check if the fiber is finished and if so, clean up.
|
||||
if (psm.isTerminated) {
|
||||
_stateMachines.remove(psm)
|
||||
checkpointsMap.remove(prevCheckpointKey)
|
||||
}
|
||||
} catch (t: Throwable) {
|
||||
logger.error("Caught error whilst invoking protocol state machine", t)
|
||||
throw t
|
||||
} finally {
|
||||
CONTINUATION_LOGGER.set(oldLogger)
|
||||
}
|
||||
|
||||
// If continuation returns null, it's finished and the result future has been set.
|
||||
if (nextState == null)
|
||||
return c
|
||||
|
||||
val req = nextState.value() as? ContinuationResult ?: return c
|
||||
|
||||
// Else, it wants us to do something: send, receive, or send-and-receive.
|
||||
if (req is ContinuationResult.ExpectingResponse<*>) {
|
||||
// Prepare a listener on the network that runs in the background thread when we received a message.
|
||||
val topic = "${req.topic}.${req.sessionIDForReceive}"
|
||||
setupNextMessageHandler(logger, net, nextState, otherSide, req.responseType, topic, prevPersistedBytes)
|
||||
}
|
||||
// If an object to send was provided (not null), send it now.
|
||||
req.obj?.let {
|
||||
val topic = "${req.topic}.${req.sessionIDForSend}"
|
||||
logger.trace { "-> $topic : message of type ${it.javaClass.name}" }
|
||||
net.send(net.createMessage(topic, it.serialize().bits), otherSide)
|
||||
}
|
||||
if (req is ContinuationResult.NotExpectingResponse) {
|
||||
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
||||
return iterateStateMachine(nextState, net, otherSide, null, logger, prevPersistedBytes)
|
||||
} else {
|
||||
return nextState
|
||||
}
|
||||
}
|
||||
|
||||
private fun setupNextMessageHandler(logger: Logger, net: MessagingService, nextState: Continuation,
|
||||
otherSide: MessageRecipients, responseType: Class<*>,
|
||||
topic: String, prevPersistedBytes: ByteArray?) {
|
||||
val checkpoint = Checkpoint(nextState, otherSide, logger.name, topic, responseType.name)
|
||||
private fun checkpointAndSetupMessageHandler(logger: Logger, net: MessagingService, psm: ProtocolStateMachine<*,*>,
|
||||
otherSide: MessageRecipients, responseType: Class<*>,
|
||||
topic: String, prevCheckpointKey: SecureHash?, serialisedFiber: ByteArray) {
|
||||
val checkpoint = Checkpoint(serialisedFiber, otherSide, logger.name, topic, responseType.name)
|
||||
val curPersistedBytes = checkpoint.serialize().bits
|
||||
persistCheckpoint(prevPersistedBytes, curPersistedBytes)
|
||||
persistCheckpoint(prevCheckpointKey, curPersistedBytes)
|
||||
val newCheckpointKey = curPersistedBytes.sha256()
|
||||
net.runOnNextMessage(topic, runInThread) { netMsg ->
|
||||
val obj: Any = THREAD_LOCAL_KRYO.get().readObject(Input(netMsg.data), responseType)
|
||||
logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
|
||||
iterateStateMachine(nextState, net, otherSide, obj, logger, curPersistedBytes)
|
||||
iterateStateMachine(psm, net, logger, obj, otherSide, newCheckpointKey) {
|
||||
Fiber.unpark(it, QUASAR_UNBLOCKER)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val CONTINUATION_LOGGER = ThreadLocal<Logger>()
|
||||
object SameThreadFiberScheduler : FiberExecutorScheduler("Same thread scheduler", MoreExecutors.directExecutor())
|
||||
|
||||
/**
|
||||
* The base class that should be used by any object that wishes to act as a protocol state machine. Sub-classes should
|
||||
@ -240,54 +221,89 @@ val CONTINUATION_LOGGER = ThreadLocal<Logger>()
|
||||
* define your own constructor. Instead define a separate class that holds your initial arguments, and take it as
|
||||
* the argument to [call].
|
||||
*/
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
abstract class ProtocolStateMachine<T, R> : Runnable {
|
||||
protected fun logger(): Logger = CONTINUATION_LOGGER.get()
|
||||
|
||||
// These fields shouldn't be serialised.
|
||||
abstract class ProtocolStateMachine<C, R> : Fiber<R>("protocol", SameThreadFiberScheduler) {
|
||||
// These fields shouldn't be serialised, so they are marked @Transient.
|
||||
@Transient private var suspendFunc: ((result: FiberRequest, serFiber: ByteArray) -> Unit)? = null
|
||||
@Transient private var resumeWithObject: Any? = null
|
||||
@Transient protected lateinit var serviceHub: ServiceHub
|
||||
@Transient protected lateinit var logger: Logger
|
||||
@Transient private var _resultFuture: SettableFuture<R> = SettableFuture.create<R>()
|
||||
/** This future will complete when the call method returns. */
|
||||
val resultFuture: ListenableFuture<R> get() = _resultFuture
|
||||
|
||||
/** This field is initialised by the framework to point to various infrastructure submodules. */
|
||||
@Transient lateinit var serviceHub: ServiceHub
|
||||
fun prepareForResumeWith(serviceHub: ServiceHub, withObject: Any?, logger: Logger,
|
||||
suspendFunc: (FiberRequest, ByteArray) -> Unit) {
|
||||
this.suspendFunc = suspendFunc
|
||||
this.logger = logger
|
||||
this.resumeWithObject = withObject
|
||||
this.serviceHub = serviceHub
|
||||
}
|
||||
|
||||
abstract fun call(args: T): R
|
||||
@Suspendable
|
||||
abstract fun call(args: C): R
|
||||
|
||||
override fun run() {
|
||||
// TODO: Catch any exceptions here and put them in the future.
|
||||
val r = call(Continuation.getContext() as T)
|
||||
if (r != null)
|
||||
_resultFuture.set(r)
|
||||
@Suspendable @Suppress("UNCHECKED_CAST")
|
||||
override fun run(): R {
|
||||
val result = call(resumeWithObject!! as C)
|
||||
if (result != null)
|
||||
_resultFuture.set(result)
|
||||
return result
|
||||
}
|
||||
|
||||
@Suspendable @Suppress("UNCHECKED_CAST")
|
||||
private fun <T : Any> suspendAndExpectReceive(with: FiberRequest): T {
|
||||
Fiber.parkAndSerialize { fiber, serializer ->
|
||||
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
|
||||
val deserializer = Fiber.getFiberSerializer() as KryoSerializer
|
||||
val kryo = createKryo(deserializer.kryo)
|
||||
val stream = ByteArrayOutputStream()
|
||||
Output(stream).use {
|
||||
kryo.writeClassAndObject(it, this)
|
||||
}
|
||||
suspendFunc!!(with, stream.toByteArray())
|
||||
}
|
||||
val tmp = resumeWithObject ?: throw IllegalStateException("Expected to receive something")
|
||||
resumeWithObject = null
|
||||
return tmp as T
|
||||
}
|
||||
|
||||
@Suspendable @Suppress("UNCHECKED_CAST")
|
||||
protected fun <T : Any> sendAndReceive(topic: String, sessionIDForSend: Long, sessionIDForReceive: Long,
|
||||
obj: Any, recvType: Class<T>): T {
|
||||
val result = FiberRequest.ExpectingResponse(topic, sessionIDForSend, sessionIDForReceive, obj, recvType)
|
||||
return suspendAndExpectReceive<T>(result)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
protected fun <T : Any> receive(topic: String, sessionIDForReceive: Long, recvType: Class<T>): T {
|
||||
val result = FiberRequest.ExpectingResponse(topic, -1, sessionIDForReceive, null, recvType)
|
||||
return suspendAndExpectReceive<T>(result)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
protected fun send(topic: String, sessionID: Long, obj: Any) {
|
||||
val result = FiberRequest.NotExpectingResponse(topic, sessionID, obj)
|
||||
Fiber.parkAndSerialize { fiber, writer -> suspendFunc!!(result, writer.write(fiber)) }
|
||||
}
|
||||
|
||||
// Convenience functions for Kotlin users.
|
||||
inline protected fun <reified R : Any> sendAndReceive(topic: String, sessionIDForSend: Long,
|
||||
sessionIDForReceive: Long, obj: Any): R {
|
||||
return sendAndReceive(topic, sessionIDForSend, sessionIDForReceive, obj, R::class.java)
|
||||
}
|
||||
inline protected fun <reified R : Any> receive(topic: String, sessionIDForReceive: Long): R {
|
||||
return receive(topic, sessionIDForReceive, R::class.java)
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
|
||||
inline fun <S : Any> ProtocolStateMachine<*, *>.send(topic: String, sessionID: Long, obj: S) =
|
||||
Continuation.suspend(ContinuationResult.NotExpectingResponse(topic, sessionID, obj))
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
inline fun <reified R : Any> ProtocolStateMachine<*, *>.sendAndReceive(
|
||||
topic: String, sessionIDForSend: Long, sessionIDForReceive: Long, obj: Any): R {
|
||||
return Continuation.suspend(ContinuationResult.ExpectingResponse(topic, sessionIDForSend, sessionIDForReceive,
|
||||
obj, R::class.java)) as R
|
||||
}
|
||||
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
inline fun <reified R : Any> ProtocolStateMachine<*, *>.receive(
|
||||
topic: String, sessionIDForReceive: Long): R {
|
||||
return Continuation.suspend(ContinuationResult.ExpectingResponse(topic, -1, sessionIDForReceive, null, R::class.java)) as R
|
||||
}
|
||||
|
||||
open class ContinuationResult(val topic: String, val sessionIDForSend: Long, val sessionIDForReceive: Long, val obj: Any?) {
|
||||
open class FiberRequest(val topic: String, val sessionIDForSend: Long, val sessionIDForReceive: Long, val obj: Any?) {
|
||||
class ExpectingResponse<R : Any>(
|
||||
topic: String,
|
||||
sessionIDForSend: Long,
|
||||
sessionIDForReceive: Long,
|
||||
obj: Any?,
|
||||
val responseType: Class<R>
|
||||
) : ContinuationResult(topic, sessionIDForSend, sessionIDForReceive, obj)
|
||||
) : FiberRequest(topic, sessionIDForSend, sessionIDForReceive, obj)
|
||||
|
||||
class NotExpectingResponse(topic: String, sessionIDForSend: Long, obj: Any?) : ContinuationResult(topic, sessionIDForSend, -1, obj)
|
||||
class NotExpectingResponse(topic: String, sessionIDForSend: Long, obj: Any?) : FiberRequest(topic, sessionIDForSend, -1, obj)
|
||||
}
|
@ -153,8 +153,8 @@ class ImmutableClassSerializer<T : Any>(val klass: KClass<T>) : Serializer<T>()
|
||||
}
|
||||
}
|
||||
|
||||
fun createKryo(): Kryo {
|
||||
return Kryo().apply {
|
||||
fun createKryo(k: Kryo = Kryo()): Kryo {
|
||||
return k.apply {
|
||||
// Allow any class to be deserialized (this is insecure but for prototyping we don't care)
|
||||
isRegistrationRequired = false
|
||||
// Allow construction of objects using a JVM backdoor that skips invoking the constructors, if there is no
|
||||
|
@ -1,31 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package core.utilities.continuations
|
||||
|
||||
import org.apache.commons.javaflow.Continuation
|
||||
import org.apache.commons.javaflow.ContinuationClassLoader
|
||||
|
||||
/**
|
||||
* A "continuation" is an object that represents a suspended execution of a function. They allow you to write code
|
||||
* that suspends itself half way through, bundles up everything that was on the stack into a (potentially serialisable)
|
||||
* object, and then be resumed from the exact same spot later. Continuations are not natively supported by the JVM
|
||||
* but we can use the Apache JavaFlow library which implements them using bytecode rewriting.
|
||||
*
|
||||
* The primary benefit of using continuations is that state machine/protocol code that would otherwise be very
|
||||
* convoluted and hard to read becomes very clear and straightforward.
|
||||
*
|
||||
* TODO: Document classloader interactions and gotchas here.
|
||||
*/
|
||||
inline fun <reified T : Runnable> loadContinuationClass(classLoader: ClassLoader): Continuation {
|
||||
val klass = T::class.java
|
||||
val url = klass.protectionDomain.codeSource.location
|
||||
val cl = ContinuationClassLoader(arrayOf(url), classLoader)
|
||||
val obj = cl.forceLoadClass(klass.name).newInstance() as Runnable
|
||||
return Continuation.startSuspendedWith(obj)
|
||||
}
|
Loading…
Reference in New Issue
Block a user