Protocol framework: tweak error handling a bit and add unit test for it.

This commit is contained in:
Mike Hearn 2016-02-16 16:17:51 +01:00
parent df4d926bca
commit dc520392b8
2 changed files with 98 additions and 21 deletions
src
main/kotlin/core/messaging
test/kotlin/core/messaging

@ -14,6 +14,7 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.google.common.base.Throwables
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
@ -28,6 +29,8 @@ import core.utilities.trace
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream
import java.io.PrintWriter
import java.io.StringWriter
import java.util.*
import java.util.concurrent.Callable
import java.util.concurrent.Executor
@ -44,7 +47,6 @@ import javax.annotation.concurrent.ThreadSafe
* A "state machine" is a class with a single call method. The call method and any others it invokes are rewritten by
* a bytecode rewriting engine called Quasar, to ensure the code can be suspended and resumed at any point.
*
* TODO: The framework should propagate exceptions and handle error handling automatically.
* TODO: Session IDs should be set up and propagated automatically, on demand.
* TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised
* continuation is always unique?
@ -94,6 +96,10 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
)
init {
// Blank out the default uncaught exception handler because we always catch things ourselves, and the default
// just redundantly prints stack traces to the logs.
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> }
if (checkpointing)
restoreCheckpoints()
}
@ -117,12 +123,26 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
val obj: Any = THREAD_LOCAL_KRYO.get().readObject(Input(netMsg.data), awaitingObjectOfType)
logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
iterateStateMachine(psm, serviceHub.networkService, logger, obj, checkpointKey) {
Fiber.unparkDeserialized(it, SameThreadFiberScheduler)
try {
Fiber.unparkDeserialized(it, SameThreadFiberScheduler)
} catch(e: Throwable) {
logError(e, logger, obj, topic, it)
}
}
}
}
}
private fun logError(e: Throwable, logger: Logger, obj: Any, topic: String, psm: ProtocolStateMachine<*>) {
logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " +
"when handling a message of type ${obj.javaClass.name} on topic $topic")
if (logger.isTraceEnabled) {
val s = StringWriter()
Throwables.getRootCause(e).printStackTrace(PrintWriter(s))
logger.trace("Stack trace of protocol error is: $s")
}
}
private fun deserializeFiber(bits: ByteArray): ProtocolStateMachine<*> {
val deserializer = Fiber.getFiberSerializer() as KryoSerializer
val kryo = createKryo(deserializer.kryo)
@ -171,7 +191,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
if (request is FiberRequest.NotExpectingResponse) {
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
iterateStateMachine(psm, net, logger, null, prevCheckpointKey) {
Fiber.unpark(it, QUASAR_UNBLOCKER)
try {
Fiber.unpark(it, QUASAR_UNBLOCKER)
} catch(e: Throwable) {
logError(e, logger, request.obj!!, request.topic, it)
}
}
}
}
@ -199,7 +223,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
val obj: Any = THREAD_LOCAL_KRYO.get().readObject(Input(netMsg.data), responseType)
logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
iterateStateMachine(psm, net, logger, obj, newCheckpointKey) {
Fiber.unpark(it, QUASAR_UNBLOCKER)
try {
Fiber.unpark(it, QUASAR_UNBLOCKER)
} catch(e: Throwable) {
logError(e, logger, obj, topic, it)
}
}
}
}
@ -225,10 +253,9 @@ object SameThreadFiberScheduler : FiberExecutorScheduler("Same thread scheduler"
* send/receive/sendAndReceive because the world might change in arbitrary ways out from underneath you, for instance,
* if the node is restarted or reconfigured!
*
* Note that the result of the [call] method can be obtained in a couple of different ways. One is to call the get
* method, as the PSM is a [Future]. But that will block the calling thread until the result is ready, which may not
* be what you want (unless you know it's finished already). So you can also use the [resultFuture] property, which is
* a [ListenableFuture] and will let you register a callback.
* The result of the [call] method can be obtained by using the [resultFuture] property, which is a [ListenableFuture]
* and will let you register a callback to be informed when the protocol has completed. Note that the PSM class is also
* a future, but not a listenable one.
*
* Once created, a PSM should be passed to a [StateMachineManager] which will start it and manage its execution.
*/
@ -255,10 +282,6 @@ abstract class ProtocolStateMachine<R> : Fiber<R>("protocol", SameThreadFiberSch
this.logger = logger
this.resumeWithObject = withObject
this.serviceHub = serviceHub
setUncaughtExceptionHandler { strand, throwable ->
logger.error("Caught error whilst running protocol state machine ${strand.javaClass.name}", throwable)
}
}
// This line may look useless, but it's needed to convince the Quasar bytecode rewriter to do the right thing.
@ -266,10 +289,15 @@ abstract class ProtocolStateMachine<R> : Fiber<R>("protocol", SameThreadFiberSch
@Suspendable @Suppress("UNCHECKED_CAST")
override fun run(): R {
val result = call()
if (result != null)
(resultFuture as SettableFuture<R>).set(result)
return result
try {
val result = call()
if (result != null)
_resultFuture?.set(result)
return result
} catch (e: Throwable) {
_resultFuture?.setException(e)
throw e
}
}
@Suspendable @Suppress("UNCHECKED_CAST")

@ -18,9 +18,11 @@ import core.utilities.BriefLogFormatter
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
/**
@ -46,7 +48,7 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
@Test
fun `trade cash for commercial paper`() {
transactionGroupFor<ContractState> {
val bobsWallet = fillUp().first
val bobsWallet = fillUp(false).first
val (alicesAddress, alicesNode) = makeNode(inBackground = true)
val (bobsAddress, bobsNode) = makeNode(inBackground = true)
@ -91,7 +93,7 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
@Test
fun `shut down and restore`() {
transactionGroupFor<ContractState> {
val wallet = fillUp().first
val wallet = fillUp(false).first
val (alicesAddress, alicesNode) = makeNode(inBackground = false)
var (bobsAddress, bobsNode) = makeNode(inBackground = false)
@ -184,7 +186,7 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
@Test
fun `check dependencies of the sale asset are resolved`() {
transactionGroupFor<ContractState> {
val (bobsWallet, fakeTxns) = fillUp()
val (bobsWallet, fakeTxns) = fillUp(false)
val (alicesAddress, alicesNode) = makeNode(inBackground = true)
val (bobsAddress, bobsNode) = makeNode(inBackground = true)
@ -234,6 +236,52 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
}
}
@Test
fun `dependency with error`() {
transactionGroupFor<ContractState> {
val (bobsWallet, fakeTxns) = fillUp(withError = true)
val (alicesAddress, alicesNode) = makeNode(inBackground = true)
val (bobsAddress, bobsNode) = makeNode(inBackground = true)
val timestamper = network.setupTimestampingNode(false).first
val alicesServices = MockServices(net = alicesNode)
val bobsServices = MockServices(
wallet = MockWalletService(bobsWallet.states),
keyManagement = MockKeyManagementService(mapOf(BOB to BOB_KEY.private)),
net = bobsNode,
storage = MockStorageService(isRecording = true)
)
loadFakeTxnsIntoStorage(bobsServices.storageService)
val buyerSessionID = random63BitValue()
val aliceResult = TwoPartyTradeProtocol.runSeller(
StateMachineManager(alicesServices, backgroundThread),
timestamper,
bobsAddress,
lookup("alice's paper"),
1000.DOLLARS,
ALICE_KEY,
buyerSessionID
)
TwoPartyTradeProtocol.runBuyer(
StateMachineManager(bobsServices, backgroundThread),
timestamper,
alicesAddress,
1000.DOLLARS,
CommercialPaper.State::class.java,
buyerSessionID
)
val e = assertFailsWith<ExecutionException> {
aliceResult.get()
}
assertTrue(e.cause is TransactionVerificationException)
assertTrue(e.cause!!.cause!!.message!!.contains("at least one cash input"))
}
}
private fun TransactionGroupDSL<ContractState>.loadFakeTxnsIntoStorage(ss: StorageService) {
val txStorage = ss.validatedTransactions
val map = signAll().associateBy { it.id }
@ -243,7 +291,7 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
txStorage.putAll(map)
}
private fun TransactionGroupDSL<ContractState>.fillUp(): Pair<Wallet, List<WireTransaction>> {
private fun TransactionGroupDSL<ContractState>.fillUp(withError: Boolean): Pair<Wallet, List<WireTransaction>> {
// Bob (Buyer) has some cash he got from the Bank of Elbonia, Alice (Seller) has some commercial paper she
// wants to sell to Bob.
@ -251,7 +299,8 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
// Issued money to itself.
output("elbonian money 1") { 800.DOLLARS.CASH `issued by` MEGA_CORP `owned by` MEGA_CORP_PUBKEY }
output("elbonian money 2") { 1000.DOLLARS.CASH `issued by` MEGA_CORP `owned by` MEGA_CORP_PUBKEY }
arg(MEGA_CORP_PUBKEY) { Cash.Commands.Issue() }
if (!withError)
arg(MEGA_CORP_PUBKEY) { Cash.Commands.Issue() }
timestamp(TEST_TX_TIME)
}