diff --git a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt index 9c583bd0ca..901d853109 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt @@ -36,22 +36,24 @@ object CheckpointVerifier { val cordappsByHash = currentCordapps.associateBy { it.jarHash } - checkpointStorage.getAllCheckpoints().forEach { (_, serializedCheckpoint) -> - val checkpoint = try { - serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext) - } catch (e: ClassNotFoundException) { - val message = e.message - if (message != null) { - throw CheckpointIncompatibleException.CordappNotInstalledException(message) - } else { + checkpointStorage.getAllCheckpoints().use { + it.forEach { (_, serializedCheckpoint) -> + val checkpoint = try { + serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext) + } catch (e: ClassNotFoundException) { + val message = e.message + if (message != null) { + throw CheckpointIncompatibleException.CordappNotInstalledException(message) + } else { + throw CheckpointIncompatibleException.CannotBeDeserialisedException(e) + } + } catch (e: Exception) { throw CheckpointIncompatibleException.CannotBeDeserialisedException(e) } - } catch (e: Exception) { - throw CheckpointIncompatibleException.CannotBeDeserialisedException(e) - } - // For each Subflow, compare the checkpointed version to the current version. - checkpoint.subFlowStack.forEach { checkFlowCompatible(it, cordappsByHash, platformVersion) } + // For each Subflow, compare the checkpointed version to the current version. + checkpoint.subFlowStack.forEach { checkFlowCompatible(it, cordappsByHash, platformVersion) } + } } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 76994a826e..e872d60fde 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -13,10 +13,7 @@ import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party -import net.corda.core.internal.FlowStateMachine -import net.corda.core.internal.ThreadBox -import net.corda.core.internal.bufferUntilSubscribed -import net.corda.core.internal.castIfPossible +import net.corda.core.internal.* import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture @@ -37,11 +34,7 @@ import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion -import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor -import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker -import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor -import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor -import net.corda.node.services.statemachine.interceptors.PrintingInterceptor +import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.errorAndTerminate @@ -57,12 +50,13 @@ import rx.subjects.PublishSubject import java.lang.Integer.min import java.security.SecureRandom import java.util.HashSet -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import java.util.concurrent.ScheduledFuture -import java.util.concurrent.TimeUnit +import java.util.concurrent.* import javax.annotation.concurrent.ThreadSafe +import kotlin.collections.ArrayList +import kotlin.collections.HashMap +import kotlin.collections.component1 +import kotlin.collections.component2 +import kotlin.collections.set import kotlin.streams.toList /** @@ -322,19 +316,21 @@ class SingleThreadedStateMachineManager( } private fun restoreFlowsFromCheckpoints(): List { - return checkpointStorage.getAllCheckpoints().map { (id, serializedCheckpoint) -> - // If a flow is added before start() then don't attempt to restore it - mutex.locked { if (flows.containsKey(id)) return@map null } - val checkpoint = deserializeCheckpoint(serializedCheckpoint) ?: return@map null - logger.debug { "Restored $checkpoint" } - createFlowFromCheckpoint( - id = id, - checkpoint = checkpoint, - initialDeduplicationHandler = null, - isAnyCheckpointPersisted = true, - isStartIdempotent = false - ) - }.toList().filterNotNull() + return checkpointStorage.getAllCheckpoints().use { + it.mapNotNull { (id, serializedCheckpoint) -> + // If a flow is added before start() then don't attempt to restore it + mutex.locked { if (id in flows) return@mapNotNull null } + val checkpoint = deserializeCheckpoint(serializedCheckpoint) ?: return@mapNotNull null + logger.debug { "Restored $checkpoint" } + createFlowFromCheckpoint( + id = id, + checkpoint = checkpoint, + initialDeduplicationHandler = null, + isAnyCheckpointPersisted = true, + isStartIdempotent = false + ) + }.toList() + } } private fun resumeRestoredFlows(flows: List) { diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index 55d62380f5..c39e6e664e 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -31,8 +31,9 @@ import org.junit.Test import kotlin.streams.toList internal fun CheckpointStorage.checkpoints(): List> { - val checkpoints = getAllCheckpoints().toList() - return checkpoints.map { it.second } + return getAllCheckpoints().use { + it.map { it.second }.toList() + } } class DBCheckpointStorageTests {