Merged in mike-upgrade-quasar (pull request #284)

Upgrade Quasar to 0.7.6 and fix an exception handling bug in SMM that it revealed (if an exception was thrown immediately on protocol startup we let it leak instead of capturing it in the future.
This commit is contained in:
Mike Hearn 2016-08-18 18:26:39 +01:00
commit 3f63aecf6c
6 changed files with 41 additions and 32 deletions

View File

@ -1,6 +1,6 @@
buildscript {
ext.kotlin_version = '1.0.3'
ext.quasar_version = '0.7.5'
ext.quasar_version = '0.7.6'
ext.asm_version = '0.5.3'
ext.artemis_version = '1.3.0'
ext.jackson_version = '2.8.0.rc2'

Binary file not shown.

View File

@ -99,21 +99,17 @@ object DataVending {
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
try {
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
.success {
services.recordTransactions(req.tx)
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure { throwable ->
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}
} catch(t: Exception) {
// Already handled by the hooks on the future, ignore
}
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
.success {
services.recordTransactions(req.tx)
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure { throwable ->
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}
}
private fun handleTXRequest(req: FetchDataProtocol.Request): List<SignedTransaction?> {

View File

@ -12,6 +12,7 @@ import com.r3corda.core.utilities.UntrustworthyData
import com.r3corda.node.services.api.ServiceHubInternal
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.ExecutionException
/**
* A ProtocolStateMachine instance is a suspendable fiber that delegates all actual logic to a [ProtocolLogic] instance.
@ -62,7 +63,7 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
} catch (t: Throwable) {
actionOnEnd()
_resultFuture?.setException(t)
throw t
throw ExecutionException(t)
}
// This is to prevent actionOnEnd being called twice if it throws an exception

View File

@ -26,6 +26,7 @@ import java.io.PrintWriter
import java.io.StringWriter
import java.util.*
import java.util.Collections.synchronizedMap
import java.util.concurrent.ExecutionException
import javax.annotation.concurrent.ThreadSafe
/**
@ -215,25 +216,35 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
* restarted with checkpointed state machines in the storage service.
*/
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName)
// Need to add before iterating in case of immediate completion
initFiber(fiber) {
val checkpoint = Checkpoint(serializeFiber(fiber), null)
checkpointStorage.addCheckpoint(checkpoint)
checkpoint
}
try {
val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName)
// Need to add before iterating in case of immediate completion
initFiber(fiber) {
val checkpoint = Checkpoint(serializeFiber(fiber), null)
checkpointStorage.addCheckpoint(checkpoint)
checkpoint
}
executor.executeASAP {
iterateStateMachine(fiber, null) {
fiber.start()
}
totalStartedProtocols.inc()
}
return fiber.resultFuture
} catch (e: Throwable) {
e.printStackTrace()
throw e
} catch (e: ExecutionException) {
// There are two ways we can take exceptions in this method:
//
// 1) A bug in the SMM code itself whilst setting up the new protocol. In that case the exception will
// propagate out of this method as it would for any method.
//
// 2) An exception in the first part of the fiber after it's been invoked for the first time via
// fiber.start(). In this case the exception will be caught and stashed in the protocol logic future,
// then sent to the unhandled exception handler above which logs it, and is then rethrown wrapped
// in an ExecutionException or RuntimeException+EE so we can just catch it here and ignore it.
} catch (e: RuntimeException) {
if (e.cause !is ExecutionException)
throw e
}
return fiber.resultFuture
}
private fun updateCheckpoint(psm: ProtocolStateMachineImpl<*>,

View File

@ -7,13 +7,13 @@ import com.r3corda.core.contracts.TransactionType
import com.r3corda.core.contracts.USD
import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.core.testing.MEGA_CORP
import com.r3corda.core.testing.rootCauseExceptions
import com.r3corda.node.internal.testing.MockNetwork
import org.junit.Before
import org.junit.Test
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
/**
* Tests for the data vending service.
@ -76,10 +76,11 @@ class DataVendingServiceTests {
val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
walletServiceNode.info, tx)
// Check it was accepted
// Check it was not accepted
network.runNetwork()
val ex = assertFailsWith<java.util.concurrent.ExecutionException> { notifyPsm.get(1, TimeUnit.SECONDS) }
assertTrue(ex.cause is DataVending.Service.TransactionRejectedError)
assertFailsWith<DataVending.Service.TransactionRejectedError> {
rootCauseExceptions { notifyPsm.get() }
}
// Check the transaction is not in the receiving node
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)