diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt index 0155655a42..e94b3e35c6 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt @@ -99,21 +99,17 @@ object DataVending { // TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming // cash without from unknown parties? - try { - services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty)) - .success { - services.recordTransactions(req.tx) - val resp = NotifyTxResponseMessage(true) - val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits) - net.send(msg, req.getReplyTo(services.networkMapCache)) - }.failure { throwable -> - val resp = NotifyTxResponseMessage(false) - val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits) - net.send(msg, req.getReplyTo(services.networkMapCache)) - } - } catch(t: Exception) { - // Already handled by the hooks on the future, ignore - } + services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty)) + .success { + services.recordTransactions(req.tx) + val resp = NotifyTxResponseMessage(true) + val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits) + net.send(msg, req.getReplyTo(services.networkMapCache)) + }.failure { throwable -> + val resp = NotifyTxResponseMessage(false) + val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits) + net.send(msg, req.getReplyTo(services.networkMapCache)) + } } private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> { diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt index 5392b6c5f4..9ece842c44 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt @@ -12,6 +12,7 @@ import com.r3corda.core.utilities.UntrustworthyData import com.r3corda.node.services.api.ServiceHubInternal import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.util.concurrent.ExecutionException /** * A ProtocolStateMachine instance is a suspendable fiber that delegates all actual logic to a [ProtocolLogic] instance. @@ -62,7 +63,7 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, } catch (t: Throwable) { actionOnEnd() _resultFuture?.setException(t) - throw t + throw ExecutionException(t) } // This is to prevent actionOnEnd being called twice if it throws an exception diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt index 697ca6f638..a84f48d4a0 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt @@ -26,6 +26,7 @@ import java.io.PrintWriter import java.io.StringWriter import java.util.* import java.util.Collections.synchronizedMap +import java.util.concurrent.ExecutionException import javax.annotation.concurrent.ThreadSafe /** @@ -216,23 +217,32 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService */ fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> { val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName) + // Need to add before iterating in case of immediate completion + initFiber(fiber) { + val checkpoint = Checkpoint(serializeFiber(fiber), null) + checkpointStorage.addCheckpoint(checkpoint) + checkpoint + } try { - // Need to add before iterating in case of immediate completion - initFiber(fiber) { - val checkpoint = Checkpoint(serializeFiber(fiber), null) - checkpointStorage.addCheckpoint(checkpoint) - checkpoint - } executor.executeASAP { iterateStateMachine(fiber, null) { fiber.start() } totalStartedProtocols.inc() } - } catch (e: Throwable) { - // TODO: We should be able to remove this as we get more confident that we never fail to log exceptions. - e.printStackTrace() - check(fiber.resultFuture.isDone) + } catch (e: ExecutionException) { + // There are two ways we can take exceptions in this method: + // + // 1) A bug in the SMM code itself whilst setting up the new protocol. In that case the exception will + // propagate out of this method as it would for any method. + // + // 2) An exception in the first part of the fiber after it's been invoked for the first time via + // fiber.start(). In this case the exception will be caught and stashed in the protocol logic future, + // then sent to the unhandled exception handler above which logs it, and is then rethrown wrapped + // in an ExecutionException or RuntimeException+EE so we can just catch it here and ignore it. + } catch (e: RuntimeException) { + if (e.cause !is ExecutionException) + throw e } return fiber.resultFuture }