Added background checkpoint checker to make sure they're at least deserialisable

This commit is contained in:
Shams Asari
2017-07-04 12:12:50 +01:00
parent 54aa4802f9
commit c6e165947b
20 changed files with 181 additions and 67 deletions

View File

@ -13,6 +13,7 @@ import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.collect.HashMultimap
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import io.requery.util.CloseableIterator
import net.corda.core.*
import net.corda.core.crypto.SecureHash
@ -35,15 +36,18 @@ import net.corda.node.services.messaging.TopicSession
import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
import org.slf4j.Logger
import rx.Observable
import rx.subjects.PublishSubject
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.SECONDS
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
/**
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects.
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachineImpl] objects.
* Each such object represents an instantiation of a (two-party) flow that has reached a particular point.
*
* An implementation of this class will persist state machines to long term storage so they can survive process restarts
@ -75,9 +79,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private val quasarKryoPool = KryoPool.Builder {
val serializer = Fiber.getFiberSerializer(false) as KryoSerializer
DefaultKryoCustomizer.customize(serializer.kryo)
serializer.kryo.addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector)
serializer.kryo
val classResolver = makeNoWhitelistClassResolver().apply { setKryo(serializer.kryo) }
// TODO The ClassResolver can only be set in the Kryo constructor and Quasar doesn't provide us with a way of doing that
val field = Kryo::class.java.getDeclaredField("classResolver").apply { isAccessible = true }
serializer.kryo.apply {
field.set(this, classResolver)
DefaultKryoCustomizer.customize(this)
addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector)
}
}.build()
private object AutoCloseableSerialisationDetector : Serializer<AutoCloseable>() {
@ -107,8 +116,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
val scheduler = FiberScheduler()
sealed class Change {
abstract val logic: FlowLogic<*>
@ -129,14 +136,18 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private val scheduler = FiberScheduler()
private val mutex = ThreadBox(InnerState())
// This thread (only enabled in dev mode) deserialises checkpoints in the background to shake out bugs in checkpoint restore.
private val checkpointCheckerThread = if (serviceHub.configuration.devMode) Executors.newSingleThreadExecutor() else null
@Volatile private var unrestorableCheckpoints = false
// True if we're shutting down, so don't resume anything.
@Volatile private var stopping = false
// How many Fibers are running and not suspended. If zero and stopping is true, then we are halted.
private val liveFibers = ReusableLatch()
// Monitoring support.
private val metrics = serviceHub.monitoringService.metrics
@ -225,6 +236,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
// Account for any expected Fibers in a test scenario.
liveFibers.countDown(allowedUnsuspendedFiberCount)
liveFibers.await()
checkpointCheckerThread?.let { MoreExecutors.shutdownAndAwaitTermination(it, 5, SECONDS) }
check(!unrestorableCheckpoints) { "Unrestorable checkpoints where created, please check the logs for details." }
}
/**
@ -239,12 +252,13 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun restoreFibersFromCheckpoints() {
mutex.locked {
checkpointStorage.forEach {
checkpointStorage.forEach { checkpoint ->
// If a flow is added before start() then don't attempt to restore it
if (!stateMachines.containsValue(it)) {
val fiber = deserializeFiber(it)
initFiber(fiber)
stateMachines[fiber] = it
if (!stateMachines.containsValue(checkpoint)) {
deserializeFiber(checkpoint, logger)?.let {
initFiber(it)
stateMachines[it] = checkpoint
}
}
true
}
@ -396,12 +410,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun deserializeFiber(checkpoint: Checkpoint): FlowStateMachineImpl<*> {
return quasarKryoPool.run { kryo ->
// put the map of token -> tokenized into the kryo context
kryo.withSerializationContext(serializationContext) {
checkpoint.serializedFiber.deserialize(kryo)
}.apply { fromCheckpoint = true }
private fun deserializeFiber(checkpoint: Checkpoint, logger: Logger): FlowStateMachineImpl<*>? {
return try {
quasarKryoPool.run { kryo ->
// put the map of token -> tokenized into the kryo context
kryo.withSerializationContext(serializationContext) {
checkpoint.serializedFiber.deserialize(kryo)
}.apply { fromCheckpoint = true }
}
} catch (t: Throwable) {
logger.error("Encountered unrestorable checkpoint!", t)
null
}
}
@ -508,6 +527,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
checkpointStorage.addCheckpoint(newCheckpoint)
checkpointingMeter.mark()
checkpointCheckerThread?.execute {
// Immediately check that the checkpoint is valid by deserialising it. The idea is to plug any holes we have
// in our testing by failing any test where unrestorable checkpoints are created.
if (deserializeFiber(newCheckpoint, fiber.logger) == null) {
unrestorableCheckpoints = true
}
}
}
private fun resumeFiber(fiber: FlowStateMachineImpl<*>) {

View File

@ -32,6 +32,7 @@ import net.corda.testing.node.MockNetwork.MockNode
import net.corda.testing.sequence
import org.apache.commons.io.IOUtils
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Assert.assertArrayEquals
import org.junit.Before
import org.junit.Test
@ -76,6 +77,11 @@ class CordaRPCOpsImplTest {
}
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `cash issue accepted`() {
val quantity = 1000L

View File

@ -17,6 +17,7 @@ import net.corda.node.utilities.transaction
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.makeTestDataSourceProperties
import org.jetbrains.exposed.sql.Database
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.io.ByteArrayInputStream
@ -38,12 +39,15 @@ class AttachmentTests {
@Before
fun setUp() {
mockNet = MockNetwork()
val dataSourceProperties = makeTestDataSourceProperties()
configuration = RequeryConfiguration(dataSourceProperties)
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
fun fakeAttachment(): ByteArray {
val bs = ByteArrayOutputStream()
val js = JarOutputStream(bs)

View File

@ -7,6 +7,7 @@ import net.corda.node.services.messaging.TopicStringValidator
import net.corda.node.services.messaging.createMessage
import net.corda.node.services.network.NetworkMapService
import net.corda.testing.node.MockNetwork
import org.junit.After
import org.junit.Test
import java.util.*
import kotlin.test.assertEquals
@ -16,8 +17,13 @@ import kotlin.test.assertTrue
class InMemoryMessagingTests {
val mockNet = MockNetwork()
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun topicStringValidation() {
fun `topic string validation`() {
TopicStringValidator.check("this.is.ok")
TopicStringValidator.check("this.is.OkAlso")
assertFails {

View File

@ -3,7 +3,6 @@ package net.corda.node.messaging
import co.paralleluniverse.fibers.Suspendable
import net.corda.contracts.CommercialPaper
import net.corda.contracts.asset.*
import net.corda.testing.contracts.fillWithSomeTestCash
import net.corda.core.*
import net.corda.core.contracts.*
import net.corda.core.crypto.DigitalSignature
@ -27,7 +26,8 @@ import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.*
import net.corda.core.utilities.LogHelper
import net.corda.core.utilities.unwrap
import net.corda.flows.TwoPartyTradeFlow.Buyer
import net.corda.flows.TwoPartyTradeFlow.Seller
import net.corda.node.internal.AbstractNode
@ -37,6 +37,7 @@ import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.checkpoints
import net.corda.node.utilities.transaction
import net.corda.testing.*
import net.corda.testing.contracts.fillWithSomeTestCash
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.MockNetwork
import org.assertj.core.api.Assertions.assertThat
@ -76,6 +77,7 @@ class TwoPartyTradeFlowTests {
@After
fun after() {
mockNet.stopNodes()
LogHelper.reset("platform.trade", "core.contract.TransactionGroup", "recordingmap")
}

View File

@ -25,6 +25,7 @@ import java.time.Clock
open class MockServiceHubInternal(
override val database: Database,
override val configuration: NodeConfiguration,
val customVault: VaultService? = null,
val customVaultQuery: VaultQueryService? = null,
val keyManagement: KeyManagementService? = null,
@ -60,8 +61,6 @@ open class MockServiceHubInternal(
get() = overrideClock ?: throw UnsupportedOperationException()
override val myInfo: NodeInfo
get() = throw UnsupportedOperationException()
override val configuration: NodeConfiguration
get() = throw UnsupportedOperationException()
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val rpcFlows: List<Class<out FlowLogic<*>>>
get() = throw UnsupportedOperationException()

View File

@ -8,16 +8,17 @@ import net.corda.core.identity.Party
import net.corda.core.node.services.ServiceInfo
import net.corda.core.seconds
import net.corda.core.transactions.WireTransaction
import net.corda.testing.DUMMY_NOTARY
import net.corda.flows.NotaryChangeFlow
import net.corda.flows.StateReplacementException
import net.corda.node.internal.AbstractNode
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.getTestPartyAndCertificate
import net.corda.testing.node.MockNetwork
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.bouncycastle.asn1.x500.X500Name
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.time.Instant
@ -45,6 +46,11 @@ class NotaryChangeTests {
mockNet.runNetwork() // Clear network map registration messages
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `should change notary for a state with single participant`() {
val state = issueState(clientNodeA, oldNotaryNode)

View File

@ -21,10 +21,12 @@ import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.transaction
import net.corda.testing.getTestX509Name
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.MockKeyManagementService
import net.corda.testing.node.TestClock
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.testNodeConfiguration
import org.assertj.core.api.Assertions.assertThat
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database
@ -32,6 +34,7 @@ import org.junit.After
import org.junit.Before
import org.junit.Test
import java.io.Closeable
import java.nio.file.Paths
import java.security.PublicKey
import java.time.Clock
import java.time.Instant
@ -67,7 +70,6 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
val testReference: NodeSchedulerServiceTest
}
@Before
fun setup() {
countDown = CountDownLatch(1)
@ -87,7 +89,12 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
InMemoryMessagingNetwork.PeerHandle(0, nullIdentity),
AffinityExecutor.ServiceAffinityExecutor("test", 1),
database)
services = object : MockServiceHubInternal(database, overrideClock = testClock, keyManagement = kms, network = mockMessagingService), TestReference {
services = object : MockServiceHubInternal(
database,
testNodeConfiguration(Paths.get("."), getTestX509Name("Alice")),
overrideClock = testClock,
keyManagement = kms,
network = mockMessagingService), TestReference {
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
override val testReference = this@NodeSchedulerServiceTest
}

View File

@ -12,14 +12,15 @@ import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.node.services.unconsumedStates
import net.corda.core.transactions.SignedTransaction
import net.corda.testing.DUMMY_NOTARY
import net.corda.flows.BroadcastTransactionFlow.NotifyTxRequest
import net.corda.node.services.NotifyTransactionHandler
import net.corda.node.utilities.transaction
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.MEGA_CORP
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import kotlin.test.assertEquals
@ -35,6 +36,11 @@ class DataVendingServiceTests {
mockNet = MockNetwork()
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `notify of transaction`() {
val (vaultServiceNode, registerNode) = mockNet.createTwoNodes()

View File

@ -54,6 +54,7 @@ import org.junit.Before
import org.junit.Test
import rx.Notification
import rx.Observable
import java.time.Instant
import java.util.*
import kotlin.reflect.KClass
import kotlin.test.assertEquals
@ -106,11 +107,7 @@ class FlowFrameworkTests {
@Test
fun `flow can lazily use the serviceHub in its constructor`() {
val flow = object : FlowLogic<Unit>() {
val lazyTime by lazy { serviceHub.clock.instant() }
@Suspendable
override fun call() = Unit
}
val flow = LazyServiceHubAccessFlow()
node1.services.startFlow(flow)
assertThat(flow.lazyTime).isNotNull()
}
@ -754,6 +751,12 @@ class FlowFrameworkTests {
.toFuture()
}
private class LazyServiceHubAccessFlow : FlowLogic<Unit>() {
val lazyTime: Instant by lazy { serviceHub.clock.instant() }
@Suspendable
override fun call() = Unit
}
private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>() {
@Transient var flowStarted = false

View File

@ -10,14 +10,15 @@ import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.seconds
import net.corda.core.transactions.SignedTransaction
import net.corda.testing.DUMMY_NOTARY
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import net.corda.flows.NotaryFlow
import net.corda.node.internal.AbstractNode
import net.corda.node.services.network.NetworkMapService
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.node.MockNetwork
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.time.Instant
@ -30,7 +31,8 @@ class NotaryServiceTests {
lateinit var notaryNode: MockNetwork.MockNode
lateinit var clientNode: MockNetwork.MockNode
@Before fun setup() {
@Before
fun setup() {
mockNet = MockNetwork()
notaryNode = mockNet.createNode(
legalName = DUMMY_NOTARY.name,
@ -39,7 +41,13 @@ class NotaryServiceTests {
mockNet.runNetwork() // Clear network map registration messages
}
@Test fun `should sign a unique transaction with a valid time-window`() {
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `should sign a unique transaction with a valid time-window`() {
val stx = run {
val inputState = issueState(clientNode)
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
@ -52,7 +60,8 @@ class NotaryServiceTests {
signatures.forEach { it.verify(stx.id) }
}
@Test fun `should sign a unique transaction without a time-window`() {
@Test
fun `should sign a unique transaction without a time-window`() {
val stx = run {
val inputState = issueState(clientNode)
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
@ -64,7 +73,8 @@ class NotaryServiceTests {
signatures.forEach { it.verify(stx.id) }
}
@Test fun `should report error for transaction with an invalid time-window`() {
@Test
fun `should report error for transaction with an invalid time-window`() {
val stx = run {
val inputState = issueState(clientNode)
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
@ -78,7 +88,8 @@ class NotaryServiceTests {
assertThat(ex.error).isInstanceOf(NotaryError.TimeWindowInvalid::class.java)
}
@Test fun `should sign identical transaction multiple times (signing is idempotent)`() {
@Test
fun `should sign identical transaction multiple times (signing is idempotent)`() {
val stx = run {
val inputState = issueState(clientNode)
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
@ -95,7 +106,8 @@ class NotaryServiceTests {
assertEquals(f1.resultFuture.getOrThrow(), f2.resultFuture.getOrThrow())
}
@Test fun `should report conflict when inputs are reused across transactions`() {
@Test
fun `should report conflict when inputs are reused across transactions`() {
val inputState = issueState(clientNode)
val stx = run {
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)

View File

@ -10,16 +10,17 @@ import net.corda.core.crypto.DigitalSignature
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.transactions.SignedTransaction
import net.corda.testing.DUMMY_NOTARY
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import net.corda.flows.NotaryFlow
import net.corda.node.internal.AbstractNode
import net.corda.node.services.issueInvalidState
import net.corda.node.services.network.NetworkMapService
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.MEGA_CORP_KEY
import net.corda.testing.node.MockNetwork
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.*
@ -31,7 +32,8 @@ class ValidatingNotaryServiceTests {
lateinit var notaryNode: MockNetwork.MockNode
lateinit var clientNode: MockNetwork.MockNode
@Before fun setup() {
@Before
fun setup() {
mockNet = MockNetwork()
notaryNode = mockNet.createNode(
legalName = DUMMY_NOTARY.name,
@ -41,7 +43,13 @@ class ValidatingNotaryServiceTests {
mockNet.runNetwork() // Clear network map registration messages
}
@Test fun `should report error for invalid transaction dependency`() {
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `should report error for invalid transaction dependency`() {
val stx = run {
val inputState = issueInvalidState(clientNode, notaryNode.info.notaryIdentity)
val tx = TransactionType.General.Builder(notaryNode.info.notaryIdentity).withItems(inputState)
@ -54,7 +62,8 @@ class ValidatingNotaryServiceTests {
assertThat(ex.error).isInstanceOf(NotaryError.SignaturesInvalid::class.java)
}
@Test fun `should report error for missing signatures`() {
@Test
fun `should report error for missing signatures`() {
val expectedMissingKey = MEGA_CORP_KEY.public
val stx = run {
val inputState = issueState(clientNode)