All uses of CheckpointStorage.getAllCheckpoints() close the stream after use (#5230)

This commit is contained in:
Shams Asari 2019-06-19 09:45:38 +01:00 committed by GitHub
parent f01f8a129e
commit 843c3a0190
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 42 deletions

View File

@ -36,22 +36,24 @@ object CheckpointVerifier {
val cordappsByHash = currentCordapps.associateBy { it.jarHash }
checkpointStorage.getAllCheckpoints().forEach { (_, serializedCheckpoint) ->
val checkpoint = try {
serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext)
} catch (e: ClassNotFoundException) {
val message = e.message
if (message != null) {
throw CheckpointIncompatibleException.CordappNotInstalledException(message)
} else {
checkpointStorage.getAllCheckpoints().use {
it.forEach { (_, serializedCheckpoint) ->
val checkpoint = try {
serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext)
} catch (e: ClassNotFoundException) {
val message = e.message
if (message != null) {
throw CheckpointIncompatibleException.CordappNotInstalledException(message)
} else {
throw CheckpointIncompatibleException.CannotBeDeserialisedException(e)
}
} catch (e: Exception) {
throw CheckpointIncompatibleException.CannotBeDeserialisedException(e)
}
} catch (e: Exception) {
throw CheckpointIncompatibleException.CannotBeDeserialisedException(e)
}
// For each Subflow, compare the checkpointed version to the current version.
checkpoint.subFlowStack.forEach { checkFlowCompatible(it, cordappsByHash, platformVersion) }
// For each Subflow, compare the checkpointed version to the current version.
checkpoint.subFlowStack.forEach { checkFlowCompatible(it, cordappsByHash, platformVersion) }
}
}
}

View File

@ -13,10 +13,7 @@ import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.castIfPossible
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
@ -37,11 +34,7 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.errorAndTerminate
@ -57,12 +50,13 @@ import rx.subjects.PublishSubject
import java.lang.Integer.min
import java.security.SecureRandom
import java.util.HashSet
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.*
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
import kotlin.collections.HashMap
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
import kotlin.streams.toList
/**
@ -322,19 +316,21 @@ class SingleThreadedStateMachineManager(
}
private fun restoreFlowsFromCheckpoints(): List<Flow> {
return checkpointStorage.getAllCheckpoints().map { (id, serializedCheckpoint) ->
// If a flow is added before start() then don't attempt to restore it
mutex.locked { if (flows.containsKey(id)) return@map null }
val checkpoint = deserializeCheckpoint(serializedCheckpoint) ?: return@map null
logger.debug { "Restored $checkpoint" }
createFlowFromCheckpoint(
id = id,
checkpoint = checkpoint,
initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true,
isStartIdempotent = false
)
}.toList().filterNotNull()
return checkpointStorage.getAllCheckpoints().use {
it.mapNotNull { (id, serializedCheckpoint) ->
// If a flow is added before start() then don't attempt to restore it
mutex.locked { if (id in flows) return@mapNotNull null }
val checkpoint = deserializeCheckpoint(serializedCheckpoint) ?: return@mapNotNull null
logger.debug { "Restored $checkpoint" }
createFlowFromCheckpoint(
id = id,
checkpoint = checkpoint,
initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true,
isStartIdempotent = false
)
}.toList()
}
}
private fun resumeRestoredFlows(flows: List<Flow>) {

View File

@ -31,8 +31,9 @@ import org.junit.Test
import kotlin.streams.toList
internal fun CheckpointStorage.checkpoints(): List<SerializedBytes<Checkpoint>> {
val checkpoints = getAllCheckpoints().toList()
return checkpoints.map { it.second }
return getAllCheckpoints().use {
it.map { it.second }.toList()
}
}
class DBCheckpointStorageTests {