Clarify exception handling in the SMM add method a bit.

This commit is contained in:
Mike Hearn
2016-08-17 14:57:59 +01:00
parent db3aa1491c
commit 6adebd3fa8
3 changed files with 33 additions and 26 deletions

View File

@ -99,21 +99,17 @@ object DataVending {
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming // TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties? // cash without from unknown parties?
try { services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty)) .success {
.success { services.recordTransactions(req.tx)
services.recordTransactions(req.tx) val resp = NotifyTxResponseMessage(true)
val resp = NotifyTxResponseMessage(true) val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits) net.send(msg, req.getReplyTo(services.networkMapCache))
net.send(msg, req.getReplyTo(services.networkMapCache)) }.failure { throwable ->
}.failure { throwable -> val resp = NotifyTxResponseMessage(false)
val resp = NotifyTxResponseMessage(false) val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits) net.send(msg, req.getReplyTo(services.networkMapCache))
net.send(msg, req.getReplyTo(services.networkMapCache)) }
}
} catch(t: Exception) {
// Already handled by the hooks on the future, ignore
}
} }
private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> { private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> {

View File

@ -12,6 +12,7 @@ import com.r3corda.core.utilities.UntrustworthyData
import com.r3corda.node.services.api.ServiceHubInternal import com.r3corda.node.services.api.ServiceHubInternal
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.util.concurrent.ExecutionException
/** /**
* A ProtocolStateMachine instance is a suspendable fiber that delegates all actual logic to a [ProtocolLogic] instance. * A ProtocolStateMachine instance is a suspendable fiber that delegates all actual logic to a [ProtocolLogic] instance.
@ -62,7 +63,7 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
} catch (t: Throwable) { } catch (t: Throwable) {
actionOnEnd() actionOnEnd()
_resultFuture?.setException(t) _resultFuture?.setException(t)
throw t throw ExecutionException(t)
} }
// This is to prevent actionOnEnd being called twice if it throws an exception // This is to prevent actionOnEnd being called twice if it throws an exception

View File

@ -26,6 +26,7 @@ import java.io.PrintWriter
import java.io.StringWriter import java.io.StringWriter
import java.util.* import java.util.*
import java.util.Collections.synchronizedMap import java.util.Collections.synchronizedMap
import java.util.concurrent.ExecutionException
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
/** /**
@ -216,23 +217,32 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
*/ */
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> { fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName) val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName)
// Need to add before iterating in case of immediate completion
initFiber(fiber) {
val checkpoint = Checkpoint(serializeFiber(fiber), null)
checkpointStorage.addCheckpoint(checkpoint)
checkpoint
}
try { try {
// Need to add before iterating in case of immediate completion
initFiber(fiber) {
val checkpoint = Checkpoint(serializeFiber(fiber), null)
checkpointStorage.addCheckpoint(checkpoint)
checkpoint
}
executor.executeASAP { executor.executeASAP {
iterateStateMachine(fiber, null) { iterateStateMachine(fiber, null) {
fiber.start() fiber.start()
} }
totalStartedProtocols.inc() totalStartedProtocols.inc()
} }
} catch (e: Throwable) { } catch (e: ExecutionException) {
// TODO: We should be able to remove this as we get more confident that we never fail to log exceptions. // There are two ways we can take exceptions in this method:
e.printStackTrace() //
check(fiber.resultFuture.isDone) // 1) A bug in the SMM code itself whilst setting up the new protocol. In that case the exception will
// propagate out of this method as it would for any method.
//
// 2) An exception in the first part of the fiber after it's been invoked for the first time via
// fiber.start(). In this case the exception will be caught and stashed in the protocol logic future,
// then sent to the unhandled exception handler above which logs it, and is then rethrown wrapped
// in an ExecutionException or RuntimeException+EE so we can just catch it here and ignore it.
} catch (e: RuntimeException) {
if (e.cause !is ExecutionException)
throw e
} }
return fiber.resultFuture return fiber.resultFuture
} }