From dc520392b8ef8f1f553e3e63846a6fb3f213c90c Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Tue, 16 Feb 2016 16:17:51 +0100 Subject: [PATCH] Protocol framework: tweak error handling a bit and add unit test for it. --- .../kotlin/core/messaging/StateMachines.kt | 60 ++++++++++++++----- .../messaging/TwoPartyTradeProtocolTests.kt | 59 ++++++++++++++++-- 2 files changed, 98 insertions(+), 21 deletions(-) diff --git a/src/main/kotlin/core/messaging/StateMachines.kt b/src/main/kotlin/core/messaging/StateMachines.kt index b69ec49ea0..8c284d45de 100644 --- a/src/main/kotlin/core/messaging/StateMachines.kt +++ b/src/main/kotlin/core/messaging/StateMachines.kt @@ -14,6 +14,7 @@ import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.io.serialization.kryo.KryoSerializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output +import com.google.common.base.Throwables import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.SettableFuture @@ -28,6 +29,8 @@ import core.utilities.trace import org.slf4j.Logger import org.slf4j.LoggerFactory import java.io.ByteArrayOutputStream +import java.io.PrintWriter +import java.io.StringWriter import java.util.* import java.util.concurrent.Callable import java.util.concurrent.Executor @@ -44,7 +47,6 @@ import javax.annotation.concurrent.ThreadSafe * A "state machine" is a class with a single call method. The call method and any others it invokes are rewritten by * a bytecode rewriting engine called Quasar, to ensure the code can be suspended and resumed at any point. * - * TODO: The framework should propagate exceptions and handle error handling automatically. * TODO: Session IDs should be set up and propagated automatically, on demand. * TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised * continuation is always unique? @@ -94,6 +96,10 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) ) init { + // Blank out the default uncaught exception handler because we always catch things ourselves, and the default + // just redundantly prints stack traces to the logs. + Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> } + if (checkpointing) restoreCheckpoints() } @@ -117,12 +123,26 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) val obj: Any = THREAD_LOCAL_KRYO.get().readObject(Input(netMsg.data), awaitingObjectOfType) logger.trace { "<- $topic : message of type ${obj.javaClass.name}" } iterateStateMachine(psm, serviceHub.networkService, logger, obj, checkpointKey) { - Fiber.unparkDeserialized(it, SameThreadFiberScheduler) + try { + Fiber.unparkDeserialized(it, SameThreadFiberScheduler) + } catch(e: Throwable) { + logError(e, logger, obj, topic, it) + } } } } } + private fun logError(e: Throwable, logger: Logger, obj: Any, topic: String, psm: ProtocolStateMachine<*>) { + logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " + + "when handling a message of type ${obj.javaClass.name} on topic $topic") + if (logger.isTraceEnabled) { + val s = StringWriter() + Throwables.getRootCause(e).printStackTrace(PrintWriter(s)) + logger.trace("Stack trace of protocol error is: $s") + } + } + private fun deserializeFiber(bits: ByteArray): ProtocolStateMachine<*> { val deserializer = Fiber.getFiberSerializer() as KryoSerializer val kryo = createKryo(deserializer.kryo) @@ -171,7 +191,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) if (request is FiberRequest.NotExpectingResponse) { // We sent a message, but don't expect a response, so re-enter the continuation to let it keep going. iterateStateMachine(psm, net, logger, null, prevCheckpointKey) { - Fiber.unpark(it, QUASAR_UNBLOCKER) + try { + Fiber.unpark(it, QUASAR_UNBLOCKER) + } catch(e: Throwable) { + logError(e, logger, request.obj!!, request.topic, it) + } } } } @@ -199,7 +223,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) val obj: Any = THREAD_LOCAL_KRYO.get().readObject(Input(netMsg.data), responseType) logger.trace { "<- $topic : message of type ${obj.javaClass.name}" } iterateStateMachine(psm, net, logger, obj, newCheckpointKey) { - Fiber.unpark(it, QUASAR_UNBLOCKER) + try { + Fiber.unpark(it, QUASAR_UNBLOCKER) + } catch(e: Throwable) { + logError(e, logger, obj, topic, it) + } } } } @@ -225,10 +253,9 @@ object SameThreadFiberScheduler : FiberExecutorScheduler("Same thread scheduler" * send/receive/sendAndReceive because the world might change in arbitrary ways out from underneath you, for instance, * if the node is restarted or reconfigured! * - * Note that the result of the [call] method can be obtained in a couple of different ways. One is to call the get - * method, as the PSM is a [Future]. But that will block the calling thread until the result is ready, which may not - * be what you want (unless you know it's finished already). So you can also use the [resultFuture] property, which is - * a [ListenableFuture] and will let you register a callback. + * The result of the [call] method can be obtained by using the [resultFuture] property, which is a [ListenableFuture] + * and will let you register a callback to be informed when the protocol has completed. Note that the PSM class is also + * a future, but not a listenable one. * * Once created, a PSM should be passed to a [StateMachineManager] which will start it and manage its execution. */ @@ -255,10 +282,6 @@ abstract class ProtocolStateMachine : Fiber("protocol", SameThreadFiberSch this.logger = logger this.resumeWithObject = withObject this.serviceHub = serviceHub - - setUncaughtExceptionHandler { strand, throwable -> - logger.error("Caught error whilst running protocol state machine ${strand.javaClass.name}", throwable) - } } // This line may look useless, but it's needed to convince the Quasar bytecode rewriter to do the right thing. @@ -266,10 +289,15 @@ abstract class ProtocolStateMachine : Fiber("protocol", SameThreadFiberSch @Suspendable @Suppress("UNCHECKED_CAST") override fun run(): R { - val result = call() - if (result != null) - (resultFuture as SettableFuture).set(result) - return result + try { + val result = call() + if (result != null) + _resultFuture?.set(result) + return result + } catch (e: Throwable) { + _resultFuture?.setException(e) + throw e + } } @Suspendable @Suppress("UNCHECKED_CAST") diff --git a/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt b/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt index 48c7eaded4..9840f49577 100644 --- a/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt +++ b/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt @@ -18,9 +18,11 @@ import core.utilities.BriefLogFormatter import org.junit.After import org.junit.Before import org.junit.Test +import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import kotlin.test.assertEquals +import kotlin.test.assertFailsWith import kotlin.test.assertTrue /** @@ -46,7 +48,7 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() { @Test fun `trade cash for commercial paper`() { transactionGroupFor { - val bobsWallet = fillUp().first + val bobsWallet = fillUp(false).first val (alicesAddress, alicesNode) = makeNode(inBackground = true) val (bobsAddress, bobsNode) = makeNode(inBackground = true) @@ -91,7 +93,7 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() { @Test fun `shut down and restore`() { transactionGroupFor { - val wallet = fillUp().first + val wallet = fillUp(false).first val (alicesAddress, alicesNode) = makeNode(inBackground = false) var (bobsAddress, bobsNode) = makeNode(inBackground = false) @@ -184,7 +186,7 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() { @Test fun `check dependencies of the sale asset are resolved`() { transactionGroupFor { - val (bobsWallet, fakeTxns) = fillUp() + val (bobsWallet, fakeTxns) = fillUp(false) val (alicesAddress, alicesNode) = makeNode(inBackground = true) val (bobsAddress, bobsNode) = makeNode(inBackground = true) @@ -234,6 +236,52 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() { } } + @Test + fun `dependency with error`() { + transactionGroupFor { + val (bobsWallet, fakeTxns) = fillUp(withError = true) + + val (alicesAddress, alicesNode) = makeNode(inBackground = true) + val (bobsAddress, bobsNode) = makeNode(inBackground = true) + val timestamper = network.setupTimestampingNode(false).first + + val alicesServices = MockServices(net = alicesNode) + val bobsServices = MockServices( + wallet = MockWalletService(bobsWallet.states), + keyManagement = MockKeyManagementService(mapOf(BOB to BOB_KEY.private)), + net = bobsNode, + storage = MockStorageService(isRecording = true) + ) + loadFakeTxnsIntoStorage(bobsServices.storageService) + + val buyerSessionID = random63BitValue() + + val aliceResult = TwoPartyTradeProtocol.runSeller( + StateMachineManager(alicesServices, backgroundThread), + timestamper, + bobsAddress, + lookup("alice's paper"), + 1000.DOLLARS, + ALICE_KEY, + buyerSessionID + ) + TwoPartyTradeProtocol.runBuyer( + StateMachineManager(bobsServices, backgroundThread), + timestamper, + alicesAddress, + 1000.DOLLARS, + CommercialPaper.State::class.java, + buyerSessionID + ) + + val e = assertFailsWith { + aliceResult.get() + } + assertTrue(e.cause is TransactionVerificationException) + assertTrue(e.cause!!.cause!!.message!!.contains("at least one cash input")) + } + } + private fun TransactionGroupDSL.loadFakeTxnsIntoStorage(ss: StorageService) { val txStorage = ss.validatedTransactions val map = signAll().associateBy { it.id } @@ -243,7 +291,7 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() { txStorage.putAll(map) } - private fun TransactionGroupDSL.fillUp(): Pair> { + private fun TransactionGroupDSL.fillUp(withError: Boolean): Pair> { // Bob (Buyer) has some cash he got from the Bank of Elbonia, Alice (Seller) has some commercial paper she // wants to sell to Bob. @@ -251,7 +299,8 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() { // Issued money to itself. output("elbonian money 1") { 800.DOLLARS.CASH `issued by` MEGA_CORP `owned by` MEGA_CORP_PUBKEY } output("elbonian money 2") { 1000.DOLLARS.CASH `issued by` MEGA_CORP `owned by` MEGA_CORP_PUBKEY } - arg(MEGA_CORP_PUBKEY) { Cash.Commands.Issue() } + if (!withError) + arg(MEGA_CORP_PUBKEY) { Cash.Commands.Issue() } timestamp(TEST_TX_TIME) }