Making sure non-serialisable objects in FlowException do not interfere with the flow session (#651)

Also TransactionVerificationException no longer has reference to non-serialisable LedgerTransaction
This commit is contained in:
Shams Asari
2017-05-09 17:01:13 +01:00
parent 9d19473578
commit e75732af91
8 changed files with 70 additions and 53 deletions

View File

@ -6,6 +6,7 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import co.paralleluniverse.strands.Strand
import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
@ -16,7 +17,6 @@ import io.requery.util.CloseableIterator
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.commonName
import net.corda.core.flows.*
import net.corda.core.serialization.*
import net.corda.core.utilities.debug
@ -28,7 +28,6 @@ import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.messaging.TopicSession
import net.corda.node.services.messaging.send
import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
@ -573,6 +572,20 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val address = serviceHub.networkService.getAddressOfParty(partyInfo)
val logger = fiber?.logger ?: logger
logger.trace { "Sending $message to party $party @ $address" + if (retryId != null) " with retry $retryId" else "" }
serviceHub.networkService.send(sessionTopic, message, address, retryId = retryId)
val serialized = try {
message.serialize()
} catch (e: KryoException) {
if (message !is ErrorSessionEnd || message.errorResponse == null) throw e
logger.warn("Something in ${message.errorResponse.javaClass.name} is not serialisable. " +
"Instead sending back an exception which is serialisable to ensure session end occurs properly.", e)
// The subclass may have overridden toString so we use that
val exMessage = message.errorResponse.let { if (it.javaClass != FlowException::class.java) it.toString() else it.message }
message.copy(errorResponse = FlowException(exMessage)).serialize()
}
serviceHub.networkService.apply {
send(createMessage(sessionTopic, serialized.bytes), address, retryId = retryId)
}
}
}

View File

@ -33,16 +33,12 @@ import net.corda.flows.NotaryFlow
import net.corda.node.services.persistence.checkpoints
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.utilities.transaction
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.getTestX509Name
import net.corda.testing.initiateSingleShotFlow
import net.corda.testing.*
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import net.corda.testing.sequence
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
@ -538,6 +534,28 @@ class FlowFrameworkTests {
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello")
}
@Test
fun `serialisation issue in counterparty`() {
node2.registerServiceFlow(ReceiveFlow::class) { SendFlow(NonSerialisableData(1), it) }
val result = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy {
result.getOrThrow()
}
}
@Test
fun `FlowException has non-serialisable object`() {
node2.initiateSingleShotFlow(ReceiveFlow::class) {
ExceptionFlow { NonSerialisableFlowException(NonSerialisableData(1)) }
}
val result = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThatExceptionOfType(FlowException::class.java).isThrownBy {
result.getOrThrow()
}
}
@Test
fun `wait for transaction`() {
val ptx = TransactionBuilder(notary = notary1.info.notaryIdentity)
@ -704,7 +722,7 @@ class FlowFrameworkTests {
@InitiatingFlow
private open class SendFlow(val payload: String, vararg val otherParties: Party) : FlowLogic<Unit>() {
private open class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<Unit>() {
init {
require(otherParties.isNotEmpty())
}
@ -835,5 +853,8 @@ class FlowFrameworkTests {
}
}
private data class NonSerialisableData(val a: Int)
private class NonSerialisableFlowException(val data: NonSerialisableData) : FlowException()
//endregion Helpers
}