Provide useful error message if db iterator is left on stack during checkpointing

AutoCloseables in general are also forbidden as restoring them across node restarts is not supported.
This commit is contained in:
Shams Asari 2017-04-18 12:13:31 +01:00
parent 66890d845a
commit 8e0a0ba8fb
3 changed files with 43 additions and 5 deletions

View File

@ -5,9 +5,14 @@ import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.io.serialization.kryo.KryoSerializer import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.Strand
import com.codahale.metrics.Gauge import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.collect.HashMultimap import com.google.common.collect.HashMultimap
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import io.requery.util.CloseableIterator
import net.corda.core.ThreadBox import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
@ -73,8 +78,25 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private val quasarKryoPool = KryoPool.Builder { private val quasarKryoPool = KryoPool.Builder {
val serializer = Fiber.getFiberSerializer(false) as KryoSerializer val serializer = Fiber.getFiberSerializer(false) as KryoSerializer
DefaultKryoCustomizer.customize(serializer.kryo) DefaultKryoCustomizer.customize(serializer.kryo)
serializer.kryo.addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector)
serializer.kryo
}.build() }.build()
private object AutoCloseableSerialisationDetector : Serializer<AutoCloseable>() {
override fun write(kryo: Kryo, output: Output, closeable: AutoCloseable) {
val message = if (closeable is CloseableIterator<*>) {
"A live Iterator pointing to the database has been detected during flow checkpointing. This may be due " +
"to a Vault query - move it into a private method."
} else {
"${closeable.javaClass.name}, which is a closeable resource, has been detected during flow checkpointing. " +
"Restoring such resources across node restarts is not supported. Make sure code accessing it is " +
"confined to a private method or the reference is nulled out."
}
throw UnsupportedOperationException(message)
}
override fun read(kryo: Kryo, input: Input, type: Class<AutoCloseable>) = throw IllegalStateException("Should not reach here!")
}
companion object { companion object {
private val logger = loggerFor<StateMachineManager>() private val logger = loggerFor<StateMachineManager>()
internal val sessionTopic = TopicSession("platform.session") internal val sessionTopic = TopicSession("platform.session")
@ -84,7 +106,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
(fiber as FlowStateMachineImpl<*>).logger.error("Caught exception from flow", throwable) (fiber as FlowStateMachineImpl<*>).logger.error("Caught exception from flow", throwable)
} }
} }
} }
val scheduler = FiberScheduler() val scheduler = FiberScheduler()

View File

@ -3,6 +3,7 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import net.corda.contracts.asset.Cash
import net.corda.core.* import net.corda.core.*
import net.corda.core.contracts.DOLLARS import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.DummyState import net.corda.core.contracts.DummyState
@ -13,6 +14,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.unconsumedStates
import net.corda.core.serialization.OpaqueBytes import net.corda.core.serialization.OpaqueBytes
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
@ -570,6 +572,13 @@ class StateMachineManagerTests {
} }
} }
@Test
fun `lazy db iterator left on stack during checkpointing`() {
val result = node2.services.startFlow(VaultAccessFlow(node1.info.legalIdentity)).resultFuture
net.runNetwork()
assertThatThrownBy { result.getOrThrow() }.hasMessageContaining("Vault").hasMessageContaining("private method")
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////
//region Helpers //region Helpers
@ -739,5 +748,13 @@ class StateMachineManagerTests {
} }
} }
private class VaultAccessFlow(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.vaultService.unconsumedStates<Cash.State>().filter { true }
send(otherParty, "Hello")
}
}
//endregion Helpers //endregion Helpers
} }

View File

@ -14,11 +14,11 @@ import net.corda.core.utilities.Emoji
import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
import net.corda.flows.TwoPartyTradeFlow import net.corda.flows.TwoPartyTradeFlow
import java.nio.file.Path import java.nio.file.Paths
import java.util.* import java.util.*
class BuyerFlow(val otherParty: Party, class BuyerFlow(val otherParty: Party,
private val attachmentsPath: Path, private val attachmentsDirectory: String,
override val progressTracker: ProgressTracker = ProgressTracker(STARTING_BUY)) : FlowLogic<Unit>() { override val progressTracker: ProgressTracker = ProgressTracker(STARTING_BUY)) : FlowLogic<Unit>() {
object STARTING_BUY : ProgressTracker.Step("Seller connected, purchasing commercial paper asset") object STARTING_BUY : ProgressTracker.Step("Seller connected, purchasing commercial paper asset")
@ -31,7 +31,7 @@ class BuyerFlow(val otherParty: Party,
it.automaticallyExtractAttachments = true it.automaticallyExtractAttachments = true
it.storePath it.storePath
} }
services.registerFlowInitiator(SellerFlow::class.java) { BuyerFlow(it, attachmentsPath) } services.registerFlowInitiator(SellerFlow::class.java) { BuyerFlow(it, attachmentsPath.toString()) }
} }
} }
@ -73,7 +73,7 @@ class BuyerFlow(val otherParty: Party,
val cpIssuance = search.call().single() val cpIssuance = search.call().single()
cpIssuance.attachments.first().let { cpIssuance.attachments.first().let {
val p = attachmentsPath.toAbsolutePath().resolve("$it.jar") val p = Paths.get(attachmentsDirectory, "$it.jar")
println(""" println("""
The issuance of the commercial paper came with an attachment. You can find it expanded in this directory: The issuance of the commercial paper came with an attachment. You can find it expanded in this directory: