diff --git a/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt b/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt index 73c9f38fc4..a970c42451 100644 --- a/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt +++ b/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt @@ -1,6 +1,7 @@ package net.corda.core.cordapp import net.corda.core.DoNotImplement +import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.schemas.MappedSchema import net.corda.core.serialization.SerializationCustomSerializer @@ -25,7 +26,9 @@ import java.net.URL * @property serializationWhitelists List of Corda plugin registries * @property serializationCustomSerializers List of serializers * @property customSchemas List of custom schemas + * @property allFlows List of all flow classes * @property jarPath The path to the JAR for this CorDapp + * @property jarHash Hash of the jar */ @DoNotImplement interface Cordapp { @@ -39,6 +42,8 @@ interface Cordapp { val serializationWhitelists: List val serializationCustomSerializers: List> val customSchemas: Set + val allFlows: List>> val jarPath: URL val cordappClasses: List + val jarHash: SecureHash.SHA256 } \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt b/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt index c4ab6a2e43..d4092478cf 100644 --- a/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt +++ b/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt @@ -1,6 +1,7 @@ package net.corda.core.internal.cordapp import net.corda.core.cordapp.Cordapp +import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.internal.toPath import net.corda.core.schemas.MappedSchema @@ -19,7 +20,9 @@ data class CordappImpl( override val serializationWhitelists: List, override val serializationCustomSerializers: List>, override val customSchemas: Set, - override val jarPath: URL) : Cordapp { + override val allFlows: List>>, + override val jarPath: URL, + override val jarHash: SecureHash.SHA256) : Cordapp { override val name: String = jarPath.toPath().fileName.toString().removeSuffix(".jar") /** diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index fa83b7e89f..5b6eec9c6c 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -13,6 +13,8 @@ Unreleased * Shell now kills an ongoing flow when CTRL+C is pressed in the terminal. +* Add check at startup that all persisted Checkpoints are compatible with the current version of the code. + * ``ServiceHub`` and ``CordaRPCOps`` can now safely be used from multiple threads without incurring in database transaction problems. * Doorman and NetworkMap url's can now be configured individually rather than being assumed to be diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 388b70ef6f..3cd9d28acc 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -10,16 +10,7 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.crypto.newSecureRandom import net.corda.core.crypto.sign -import net.corda.core.flows.ContractUpgradeFlow -import net.corda.core.flows.FinalityFlow -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowLogicRefFactory -import net.corda.core.flows.FlowSession -import net.corda.core.flows.InitiatedBy -import net.corda.core.flows.InitiatingFlow -import net.corda.core.flows.NotaryChangeFlow -import net.corda.core.flows.NotaryFlow -import net.corda.core.flows.StartableByService +import net.corda.core.flows.* import net.corda.core.identity.AbstractParty import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party @@ -31,33 +22,17 @@ import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.notary.NotaryService import net.corda.core.internal.uncheckedCast -import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.FlowHandle -import net.corda.core.messaging.FlowHandleImpl -import net.corda.core.messaging.FlowProgressHandle -import net.corda.core.messaging.FlowProgressHandleImpl -import net.corda.core.messaging.RPCOps -import net.corda.core.node.AppServiceHub -import net.corda.core.node.NetworkParameters -import net.corda.core.node.NodeInfo -import net.corda.core.node.ServiceHub -import net.corda.core.node.ServicesForResolution -import net.corda.core.node.services.AttachmentStorage -import net.corda.core.node.services.CordaService -import net.corda.core.node.services.IdentityService -import net.corda.core.node.services.KeyManagementService -import net.corda.core.node.services.TransactionVerifierService +import net.corda.core.messaging.* +import net.corda.core.node.* +import net.corda.core.node.services.* import net.corda.core.serialization.SerializationWhitelist import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.days -import net.corda.core.utilities.debug -import net.corda.core.utilities.getOrThrow -import net.corda.core.utilities.minutes +import net.corda.core.utilities.* import net.corda.node.CordaClock import net.corda.node.VersionInfo +import net.corda.node.internal.CheckpointVerifier.verifyCheckpointsCompatible import net.corda.node.internal.classloading.requireAnnotation import net.corda.node.internal.cordapp.CordappConfigFileProvider import net.corda.node.internal.cordapp.CordappLoader @@ -69,59 +44,21 @@ import net.corda.node.internal.security.RPCSecurityManager import net.corda.node.services.ContractUpgradeHandler import net.corda.node.services.FinalityHandler import net.corda.node.services.NotaryChangeHandler -import net.corda.node.services.api.CheckpointStorage -import net.corda.node.services.api.DummyAuditService -import net.corda.node.services.api.FlowStarter -import net.corda.node.services.api.IdentityServiceInternal -import net.corda.node.services.api.MonitoringService -import net.corda.node.services.api.NetworkMapCacheBaseInternal -import net.corda.node.services.api.NetworkMapCacheInternal -import net.corda.node.services.api.NodePropertiesStore -import net.corda.node.services.api.SchedulerService -import net.corda.node.services.api.SchemaService -import net.corda.node.services.api.ServiceHubInternal -import net.corda.node.services.api.StartedNodeServices -import net.corda.node.services.api.VaultServiceInternal -import net.corda.node.services.api.WritableTransactionStorage -import net.corda.node.services.config.BFTSMaRtConfiguration -import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.config.NotaryConfig -import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.node.services.api.* +import net.corda.node.services.config.* import net.corda.node.services.config.shell.toShellConfig -import net.corda.node.services.config.shouldInitCrashShell import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.events.ScheduledActivityObserver import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.MessagingService -import net.corda.node.services.network.NetworkMapCacheImpl -import net.corda.node.services.network.NetworkMapClient -import net.corda.node.services.network.NetworkMapUpdater -import net.corda.node.services.network.NodeInfoWatcher -import net.corda.node.services.network.PersistentNetworkMapCache -import net.corda.node.services.persistence.AbstractPartyDescriptor -import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter -import net.corda.node.services.persistence.DBCheckpointStorage -import net.corda.node.services.persistence.DBTransactionMappingStorage -import net.corda.node.services.persistence.DBTransactionStorage -import net.corda.node.services.persistence.NodeAttachmentService -import net.corda.node.services.persistence.NodePropertiesPersistentStore +import net.corda.node.services.network.* +import net.corda.node.services.persistence.* import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.NodeSchemaService -import net.corda.node.services.statemachine.ExternalEvent -import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl -import net.corda.node.services.statemachine.SingleThreadedStateMachineManager -import net.corda.node.services.statemachine.StateMachineManager -import net.corda.node.services.statemachine.appName -import net.corda.node.services.statemachine.flowVersionAndInitiatingClass -import net.corda.node.services.transactions.BFTNonValidatingNotaryService -import net.corda.node.services.transactions.BFTSMaRt -import net.corda.node.services.transactions.RaftNonValidatingNotaryService -import net.corda.node.services.transactions.RaftUniquenessProvider -import net.corda.node.services.transactions.RaftValidatingNotaryService -import net.corda.node.services.transactions.SimpleNotaryService -import net.corda.node.services.transactions.ValidatingNotaryService +import net.corda.node.services.statemachine.* +import net.corda.node.services.transactions.* import net.corda.node.services.upgrade.ContractUpgradeServiceImpl import net.corda.node.services.vault.NodeVaultService import net.corda.node.utilities.AffinityExecutor @@ -726,6 +663,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, networkParameters: NetworkParameters): MutableList { checkpointStorage = DBCheckpointStorage() + verifyCheckpointsCompatible(checkpointStorage, cordappProvider.cordapps, versionInfo.platformVersion) + val keyManagementService = makeKeyManagementService(identityService, keyPairs, database) _services = ServiceHubInternalImpl( identityService, diff --git a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt new file mode 100644 index 0000000000..96b7704df3 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt @@ -0,0 +1,71 @@ +package net.corda.node.internal + +import net.corda.core.cordapp.Cordapp +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowLogic +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.deserialize +import net.corda.node.services.api.CheckpointStorage +import net.corda.node.services.statemachine.SubFlow +import net.corda.node.services.statemachine.SubFlowVersion + +object CheckpointVerifier { + + /** + * Verifies that all Checkpoints stored in the db can be safely loaded with the currently installed version. + * @throws CheckpointIncompatibleException if any offending checkpoint is found. + */ + fun verifyCheckpointsCompatible(checkpointStorage: CheckpointStorage, currentCordapps: List, platformVersion: Int) { + checkpointStorage.getAllCheckpoints().forEach { (_, serializedCheckpoint) -> + val checkpoint = try { + serializedCheckpoint.deserialize(context = SerializationDefaults.CHECKPOINT_CONTEXT) + } catch (e: Exception) { + throw CheckpointIncompatibleException.CannotBeDeserialisedException(e) + } + + // For each Subflow, compare the checkpointed version to the current version. + checkpoint.subFlowStack.forEach { checkFlowCompatible(it, currentCordapps, platformVersion) } + } + } + + // Throws exception when the flow is incompatible + private fun checkFlowCompatible(subFlow: SubFlow, currentCordapps: List, platformVersion: Int) { + val corDappInfo = subFlow.subFlowVersion + + if (corDappInfo.platformVersion != platformVersion) { + throw CheckpointIncompatibleException.SubFlowCoreVersionIncompatibleException(subFlow.flowClass, corDappInfo.platformVersion) + } + + if (corDappInfo is SubFlowVersion.CorDappFlow) { + val installedCordapps = currentCordapps.filter { it.name == corDappInfo.corDappName } + when (installedCordapps.size) { + 0 -> throw CheckpointIncompatibleException.FlowNotInstalledException(subFlow.flowClass) + 1 -> { + val currenCordapp = installedCordapps.first() + if (corDappInfo.corDappHash != currenCordapp.jarHash) { + throw CheckpointIncompatibleException.FlowVersionIncompatibleException(subFlow.flowClass, currenCordapp, corDappInfo.corDappHash) + } + } + else -> throw IllegalStateException("Multiple Cordapps with name ${corDappInfo.corDappName} installed.") // This should not happen + } + } + } +} + +/** + * Thrown at startup, if a checkpoint is found that is incompatible with the current code + */ +sealed class CheckpointIncompatibleException(override val message: String) : Exception() { + class CannotBeDeserialisedException(val e: Exception) : CheckpointIncompatibleException( + "Found checkpoint that cannot be deserialised using the current Corda version. Please revert to the previous version of Corda, drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again. Cause: ${e.message}") + + class SubFlowCoreVersionIncompatibleException(val flowClass: Class>, oldVersion: Int) : CheckpointIncompatibleException( + "Found checkpoint for flow: ${flowClass} that is incompatible with the current Corda platform. Please revert to the previous version of Corda (version ${oldVersion}), drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.") + + class FlowVersionIncompatibleException(val flowClass: Class>, val cordapp: Cordapp, oldHash: SecureHash) : CheckpointIncompatibleException( + "Found checkpoint for flow: ${flowClass} that is incompatible with the current installed version of ${cordapp.name}. Please reinstall the previous version of the CorDapp (with hash: ${oldHash}), drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.") + + class FlowNotInstalledException(val flowClass: Class>) : CheckpointIncompatibleException( + "Found checkpoint for flow: ${flowClass} that is no longer installed. Please install the missing CorDapp, drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.") +} + diff --git a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt index 33b04e564e..aa54420cb2 100644 --- a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt +++ b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt @@ -128,6 +128,13 @@ open class NodeStartup(val args: Array) { try { cmdlineOptions.baseDirectory.createDirectories() startNode(conf, versionInfo, startTime, cmdlineOptions) + } catch (e: CheckpointIncompatibleException) { + if (conf.devMode) { + Node.printWarning(e.message) + } else { + logger.error(e.message) + return false + } } catch (e: Exception) { if (e is Errors.NativeIoException && e.message?.contains("Address already in use") == true) { logger.error("One of the ports required by the Corda node is already in use.") diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt index c70c168266..109063eba4 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt @@ -4,6 +4,8 @@ import com.github.benmanes.caffeine.cache.Caffeine import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult import net.corda.core.cordapp.Cordapp +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.sha256 import net.corda.core.flows.* import net.corda.core.internal.* import net.corda.core.internal.cordapp.CordappImpl @@ -42,6 +44,17 @@ class CordappLoader private constructor(private val cordappJarPaths: List by lazy { loadCordapps() + coreCordapp } val appClassLoader: ClassLoader = URLClassLoader(cordappJarPaths.stream().map { it.url }.toTypedArray(), javaClass.classLoader) + // Create a map of the CorDapps that provide a Flow. If a flow is not in this map it is a Core flow. + // It also checks that there is only one CorDapp containing that flow class + val flowCordappMap: Map>, Cordapp> by lazy { + cordapps.flatMap { corDapp -> corDapp.allFlows.map { flow -> flow to corDapp } } + .groupBy { it.first } + .mapValues { + require(it.value.size == 1) { "There are multiple CorDapp jars on the classpath for flow ${it.value.first().first.name}: ${it.value.map { it.second.name }.joinToString()}." } + it.value.single().second + } + } + init { if (cordappJarPaths.isEmpty()) { logger.info("No CorDapp paths provided") @@ -121,6 +134,8 @@ class CordappLoader private constructor(private val cordappJarPaths: List listOf("main", "production").none { url.toString().contains("$it/$resource") } || listOf("net.corda.core", "net.corda.node", "net.corda.finance").none { scanPackage.startsWith(it) } } .map { url -> if (url.protocol == "jar") { // When running tests from gradle this may be a corda module jar, so restrict to scanPackage: @@ -142,16 +157,18 @@ class CordappLoader private constructor(private val cordappJarPaths: List val scanDir = url.toPath() - scanDir.walk { it.forEach { - val entryPath = "$resource/${scanDir.relativize(it).toString().replace('\\', '/')}" - val time = FileTime.from(Instant.EPOCH) - val entry = ZipEntry(entryPath).setCreationTime(time).setLastAccessTime(time).setLastModifiedTime(time) - jos.putNextEntry(entry) - if (it.isRegularFile()) { - it.copyTo(jos) + scanDir.walk { + it.forEach { + val entryPath = "$resource/${scanDir.relativize(it).toString().replace('\\', '/')}" + val time = FileTime.from(Instant.EPOCH) + val entry = ZipEntry(entryPath).setCreationTime(time).setLastAccessTime(time).setLastModifiedTime(time) + jos.putNextEntry(entry) + if (it.isRegularFile()) { + it.copyTo(jos) + } + jos.closeEntry() } - jos.closeEntry() - } } + } } cordappJar } @@ -186,7 +203,9 @@ class CordappLoader private constructor(private val cordappJarPaths: List> { return scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class) } @@ -241,6 +265,10 @@ class CordappLoader private constructor(private val cordappJarPaths: List>> { + return scanResult.getConcreteClassesOfType(FlowLogic::class) + } + private fun findContractClassNames(scanResult: RestrictedScanResult): List { return coreContractClasses.flatMap { scanResult.getNamesOfClassesImplementing(it) }.distinct() } @@ -324,5 +352,12 @@ class CordappLoader private constructor(private val cordappJarPaths: List getConcreteClassesOfType(type: KClass): List> { + return scanResult.getNamesOfSubclassesOf(type.java) + .filter { it.startsWith(qualifiedNamePrefix) } + .mapNotNull { loadClass(it, type) } + .filterNot { Modifier.isAbstract(it.modifiers) } + } } } diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappProviderImpl.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappProviderImpl.kt index 3e3d50b5ae..83416b6bef 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappProviderImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappProviderImpl.kt @@ -6,6 +6,7 @@ import net.corda.core.contracts.ContractClassName import net.corda.core.cordapp.Cordapp import net.corda.core.cordapp.CordappContext import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowLogic import net.corda.core.internal.DEPLOYED_CORDAPP_UPLOADER import net.corda.core.internal.createCordappContext import net.corda.core.node.services.AttachmentId @@ -22,7 +23,6 @@ open class CordappProviderImpl(private val cordappLoader: CordappLoader, private val cordappConfigProvider: CordappConfigProvider, attachmentStorage: AttachmentStorage, private val whitelistedContractImplementations: Map>) : SingletonSerializeAsToken(), CordappProviderInternal { - companion object { private val log = loggerFor() } @@ -117,4 +117,6 @@ open class CordappProviderImpl(private val cordappLoader: CordappLoader, * @return cordapp A cordapp or null if no cordapp has the given class loaded */ fun getCordappForClass(className: String): Cordapp? = cordapps.find { it.cordappClasses.contains(className) } + + override fun getCordappForFlow(flowLogic: FlowLogic<*>) = cordappLoader.flowCordappMap[flowLogic.javaClass] } diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappProviderInternal.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappProviderInternal.kt index a29d8bab25..01b88312e6 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappProviderInternal.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappProviderInternal.kt @@ -2,7 +2,9 @@ package net.corda.node.internal.cordapp import net.corda.core.cordapp.Cordapp import net.corda.core.cordapp.CordappProvider +import net.corda.core.flows.FlowLogic interface CordappProviderInternal : CordappProvider { val cordapps: List + fun getCordappForFlow(flowLogic: FlowLogic<*>): Cordapp? } diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt index 7901ea7f1e..dacc8655bd 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt @@ -1,5 +1,6 @@ package net.corda.node.services.api +import net.corda.core.cordapp.Cordapp import net.corda.core.flows.StateMachineRunId import net.corda.core.serialization.SerializedBytes import net.corda.node.services.statemachine.Checkpoint @@ -31,4 +32,4 @@ interface CheckpointStorage { * underlying database connection is closed, so any processing should happen before it is closed. */ fun getAllCheckpoints(): Stream>> -} +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt index c750a05d1e..ec51501df0 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt @@ -80,7 +80,7 @@ sealed class Event { * * @param subFlowClass the [Class] of the subflow, to be used to determine whether it's Initiating or inlined. */ - data class EnterSubFlow(val subFlowClass: Class>) : Event() + data class EnterSubFlow(val subFlowClass: Class>, val subFlowVersion: SubFlowVersion ) : Event() /** * Signal the leaving of a subflow. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 3d2b460152..c38325eeca 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -8,6 +8,7 @@ import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.channels.Channel import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext +import net.corda.core.cordapp.Cordapp import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.internal.* @@ -46,6 +47,12 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, */ fun currentStateMachine(): FlowStateMachineImpl<*>? = Strand.currentStrand() as? FlowStateMachineImpl<*> + // If no CorDapp found then it is a Core flow. + internal fun createSubFlowVersion(cordapp: Cordapp?, platformVersion: Int): SubFlowVersion { + return cordapp?.let { SubFlowVersion.CorDappFlow(platformVersion, it.name, it.jarHash) } + ?: SubFlowVersion.CoreFlow(platformVersion) + } + private val log: Logger = LoggerFactory.getLogger("net.corda.flow") private val SERIALIZER_BLOCKER = Fiber::class.java.getDeclaredField("SERIALIZER_BLOCKER").apply { isAccessible = true }.get(null) @@ -99,7 +106,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, if (value) field = value else throw IllegalArgumentException("Can only set to true") } - /** + /** * Processes an event by creating the associated transition and executing it using the given executor. * Try to avoid using this directly, instead use [processEventsUntilFlowIsResumed] or [processEventImmediately] * instead. @@ -237,7 +244,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, @Suspendable override fun subFlow(subFlow: FlowLogic): R { processEventImmediately( - Event.EnterSubFlow(subFlow.javaClass), + Event.EnterSubFlow(subFlow.javaClass, + createSubFlowVersion( + serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion)), isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnExit = true ) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index a98daaf9d9..0cf587a228 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -8,6 +8,7 @@ import co.paralleluniverse.strands.channels.Channels import com.codahale.metrics.Gauge import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext +import net.corda.core.context.InvocationOrigin import net.corda.core.flows.FlowException import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic @@ -36,6 +37,8 @@ import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.ReceivedMessage +import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion +import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor @@ -529,7 +532,9 @@ class SingleThreadedStateMachineManager( flowLogic.stateMachine = flowStateMachineImpl val frozenFlowLogic = (flowLogic as FlowLogic<*>).serialize(context = checkpointSerializationContext!!) - val initialCheckpoint = Checkpoint.create(invocationContext, flowStart, flowLogic.javaClass, frozenFlowLogic, ourIdentity, deduplicationSeed).getOrThrow() + val flowCorDappVersion= createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion) + + val initialCheckpoint = Checkpoint.create(invocationContext, flowStart, flowLogic.javaClass, frozenFlowLogic, ourIdentity, deduplicationSeed, flowCorDappVersion).getOrThrow() val startedFuture = openFuture() val initialState = StateMachineState( checkpoint = initialCheckpoint, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 9ff1edd3ca..1237128af1 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -1,6 +1,7 @@ package net.corda.node.services.statemachine import net.corda.core.context.InvocationContext +import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic import net.corda.core.identity.Party @@ -70,9 +71,10 @@ data class Checkpoint( flowLogicClass: Class>, frozenFlowLogic: SerializedBytes>, ourIdentity: Party, - deduplicationSeed: String + deduplicationSeed: String, + subFlowVersion: SubFlowVersion ): Try { - return SubFlow.create(flowLogicClass).map { topLevelSubFlow -> + return SubFlow.create(flowLogicClass, subFlowVersion).map { topLevelSubFlow -> Checkpoint( invocationContext = invocationContext, ourIdentity = ourIdentity, @@ -231,3 +233,12 @@ sealed class ErrorState { } } } + +/** + * Stored per [SubFlow]. Contains metadata around the version of the code at the Checkpointing moment. + */ +sealed class SubFlowVersion { + abstract val platformVersion: Int + data class CoreFlow(override val platformVersion: Int) : SubFlowVersion() + data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion() +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SubFlow.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SubFlow.kt index 25f9d228e9..5ddd0e6630 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SubFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SubFlow.kt @@ -1,8 +1,6 @@ package net.corda.node.services.statemachine -import net.corda.core.flows.FlowInfo -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.* import net.corda.core.utilities.Try /** @@ -15,10 +13,13 @@ import net.corda.core.utilities.Try sealed class SubFlow { abstract val flowClass: Class> + // Version of the code. + abstract val subFlowVersion: SubFlowVersion + /** * An inlined subflow. */ - data class Inlined(override val flowClass: Class>) : SubFlow() + data class Inlined(override val flowClass: Class>, override val subFlowVersion: SubFlowVersion) : SubFlow() /** * An initiating subflow. @@ -30,21 +31,22 @@ sealed class SubFlow { data class Initiating( override val flowClass: Class>, val classToInitiateWith: Class>, - val flowInfo: FlowInfo + val flowInfo: FlowInfo, + override val subFlowVersion: SubFlowVersion ) : SubFlow() companion object { - fun create(flowClass: Class>): Try { + fun create(flowClass: Class>, subFlowVersion: SubFlowVersion): Try { // Are we an InitiatingFlow? val initiatingAnnotations = getInitiatingFlowAnnotations(flowClass) return when (initiatingAnnotations.size) { 0 -> { - Try.Success(Inlined(flowClass)) + Try.Success(Inlined(flowClass, subFlowVersion)) } 1 -> { val initiatingAnnotation = initiatingAnnotations[0] val flowContext = FlowInfo(initiatingAnnotation.second.version, flowClass.appName) - Try.Success(Initiating(flowClass, initiatingAnnotation.first, flowContext)) + Try.Success(Initiating(flowClass, initiatingAnnotation.first, flowContext, subFlowVersion)) } else -> { Try.Failure(IllegalArgumentException("${InitiatingFlow::class.java.name} can only be annotated " + diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 5fc4133e98..20761b95c5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -92,7 +92,7 @@ class TopLevelTransition( private fun enterSubFlowTransition(event: Event.EnterSubFlow): TransitionResult { return builder { - val subFlow = SubFlow.create(event.subFlowClass) + val subFlow = SubFlow.create(event.subFlowClass, event.subFlowVersion) when (subFlow) { is Try.Success -> { currentState = currentState.copy( diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index 220b3f18f0..76751aa8fd 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -6,9 +6,12 @@ import net.corda.core.flows.StateMachineRunId import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.serialize +import net.corda.node.internal.CheckpointIncompatibleException +import net.corda.node.internal.CheckpointVerifier import net.corda.node.internal.configureDatabase import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.statemachine.Checkpoint +import net.corda.node.services.statemachine.SubFlowVersion import net.corda.node.services.statemachine.FlowStart import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -18,6 +21,7 @@ import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.TestIdentity import net.corda.testing.internal.LogHelper import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import org.assertj.core.api.Assertions import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Before @@ -34,6 +38,7 @@ class DBCheckpointStorageTests { private companion object { val ALICE = TestIdentity(ALICE_NAME, 70).party } + @Rule @JvmField val testSerialization = SerializationEnvironmentRule() @@ -148,19 +153,42 @@ class DBCheckpointStorageTests { } } + @Test + fun `verify checkpoints compatible`() { + database.transaction { + val (id, checkpoint) = newCheckpoint(1) + checkpointStorage.addCheckpoint(id, checkpoint) + } + + database.transaction { + CheckpointVerifier.verifyCheckpointsCompatible(checkpointStorage, emptyList(), 1) + } + + database.transaction { + val (id1, checkpoint1) = newCheckpoint(2) + checkpointStorage.addCheckpoint(id1, checkpoint1) + } + + Assertions.assertThatThrownBy { + database.transaction { + CheckpointVerifier.verifyCheckpointsCompatible(checkpointStorage, emptyList(), 1) + } + }.isInstanceOf(CheckpointIncompatibleException::class.java) + } + private fun newCheckpointStorage() { database.transaction { checkpointStorage = DBCheckpointStorage() } } - private fun newCheckpoint(): Pair> { + private fun newCheckpoint(version: Int = 1): Pair> { val id = StateMachineRunId.createRandom() val logic: FlowLogic<*> = object : FlowLogic() { override fun call() {} } val frozenLogic = logic.serialize(context = SerializationDefaults.CHECKPOINT_CONTEXT) - val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, ALICE, "").getOrThrow() + val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, ALICE, "", SubFlowVersion.CoreFlow(version)).getOrThrow() return id to checkpoint.serialize(context = SerializationDefaults.CHECKPOINT_CONTEXT) } diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/MockCordappProvider.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/MockCordappProvider.kt index e04b2aa580..b7f83c21d5 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/MockCordappProvider.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/MockCordappProvider.kt @@ -2,6 +2,7 @@ package net.corda.testing.internal import net.corda.core.contracts.ContractClassName import net.corda.core.cordapp.Cordapp +import net.corda.core.crypto.SecureHash import net.corda.core.internal.TEST_UPLOADER import net.corda.core.internal.cordapp.CordappImpl import net.corda.core.node.services.AttachmentId @@ -33,7 +34,9 @@ class MockCordappProvider( serializationWhitelists = emptyList(), serializationCustomSerializers = emptyList(), customSchemas = emptySet(), - jarPath = Paths.get("").toUri().toURL()) + jarPath = Paths.get("").toUri().toURL(), + allFlows = emptyList(), + jarHash = SecureHash.allOnesHash) if (cordappRegistry.none { it.first.contractClassNames.contains(contractClassName) }) { cordappRegistry.add(Pair(cordapp, findOrImportAttachment(listOf(contractClassName), contractClassName.toByteArray(), attachments))) }