mirror of
https://github.com/corda/corda.git
synced 2025-01-29 15:43:55 +00:00
CORDA-1477 add check for code version in checkpoints (#3256)
* CORDA-1477 add check for code version in checkpoints * CORDA-1477 Comment style * CORDA-1477 address code review comments * CORDA-1477 add changelog entry * CORDA-1477 attempt to fix tests * CORDA-1477 attempt to fix tests and address code review comments * CORDA-1477 attempt to fix tests
This commit is contained in:
parent
bd3280a06a
commit
9efb1ecfe0
@ -1,6 +1,7 @@
|
||||
package net.corda.core.cordapp
|
||||
|
||||
import net.corda.core.DoNotImplement
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.serialization.SerializationCustomSerializer
|
||||
@ -25,7 +26,9 @@ import java.net.URL
|
||||
* @property serializationWhitelists List of Corda plugin registries
|
||||
* @property serializationCustomSerializers List of serializers
|
||||
* @property customSchemas List of custom schemas
|
||||
* @property allFlows List of all flow classes
|
||||
* @property jarPath The path to the JAR for this CorDapp
|
||||
* @property jarHash Hash of the jar
|
||||
*/
|
||||
@DoNotImplement
|
||||
interface Cordapp {
|
||||
@ -39,6 +42,8 @@ interface Cordapp {
|
||||
val serializationWhitelists: List<SerializationWhitelist>
|
||||
val serializationCustomSerializers: List<SerializationCustomSerializer<*, *>>
|
||||
val customSchemas: Set<MappedSchema>
|
||||
val allFlows: List<Class<out FlowLogic<*>>>
|
||||
val jarPath: URL
|
||||
val cordappClasses: List<String>
|
||||
val jarHash: SecureHash.SHA256
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package net.corda.core.internal.cordapp
|
||||
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.internal.toPath
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
@ -19,7 +20,9 @@ data class CordappImpl(
|
||||
override val serializationWhitelists: List<SerializationWhitelist>,
|
||||
override val serializationCustomSerializers: List<SerializationCustomSerializer<*, *>>,
|
||||
override val customSchemas: Set<MappedSchema>,
|
||||
override val jarPath: URL) : Cordapp {
|
||||
override val allFlows: List<Class<out FlowLogic<*>>>,
|
||||
override val jarPath: URL,
|
||||
override val jarHash: SecureHash.SHA256) : Cordapp {
|
||||
override val name: String = jarPath.toPath().fileName.toString().removeSuffix(".jar")
|
||||
|
||||
/**
|
||||
|
@ -13,6 +13,8 @@ Unreleased
|
||||
|
||||
* Shell now kills an ongoing flow when CTRL+C is pressed in the terminal.
|
||||
|
||||
* Add check at startup that all persisted Checkpoints are compatible with the current version of the code.
|
||||
|
||||
* ``ServiceHub`` and ``CordaRPCOps`` can now safely be used from multiple threads without incurring in database transaction problems.
|
||||
|
||||
* Doorman and NetworkMap url's can now be configured individually rather than being assumed to be
|
||||
|
@ -10,16 +10,7 @@ import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.crypto.newSecureRandom
|
||||
import net.corda.core.crypto.sign
|
||||
import net.corda.core.flows.ContractUpgradeFlow
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowLogicRefFactory
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.NotaryChangeFlow
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.flows.StartableByService
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
@ -31,33 +22,17 @@ import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.NotaryService
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.messaging.FlowHandleImpl
|
||||
import net.corda.core.messaging.FlowProgressHandle
|
||||
import net.corda.core.messaging.FlowProgressHandleImpl
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.node.AppServiceHub
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.ServicesForResolution
|
||||
import net.corda.core.node.services.AttachmentStorage
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.node.services.IdentityService
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
import net.corda.core.node.services.TransactionVerifierService
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.node.*
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.serialization.SerializationWhitelist
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.node.CordaClock
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.CheckpointVerifier.verifyCheckpointsCompatible
|
||||
import net.corda.node.internal.classloading.requireAnnotation
|
||||
import net.corda.node.internal.cordapp.CordappConfigFileProvider
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
@ -69,59 +44,21 @@ import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.ContractUpgradeHandler
|
||||
import net.corda.node.services.FinalityHandler
|
||||
import net.corda.node.services.NotaryChangeHandler
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.api.DummyAuditService
|
||||
import net.corda.node.services.api.FlowStarter
|
||||
import net.corda.node.services.api.IdentityServiceInternal
|
||||
import net.corda.node.services.api.MonitoringService
|
||||
import net.corda.node.services.api.NetworkMapCacheBaseInternal
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.api.NodePropertiesStore
|
||||
import net.corda.node.services.api.SchedulerService
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.api.StartedNodeServices
|
||||
import net.corda.node.services.api.VaultServiceInternal
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.services.config.BFTSMaRtConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.NotaryConfig
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.api.*
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.node.services.config.shell.toShellConfig
|
||||
import net.corda.node.services.config.shouldInitCrashShell
|
||||
import net.corda.node.services.events.NodeSchedulerService
|
||||
import net.corda.node.services.events.ScheduledActivityObserver
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.keys.PersistentKeyManagementService
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.network.NetworkMapCacheImpl
|
||||
import net.corda.node.services.network.NetworkMapClient
|
||||
import net.corda.node.services.network.NetworkMapUpdater
|
||||
import net.corda.node.services.network.NodeInfoWatcher
|
||||
import net.corda.node.services.network.PersistentNetworkMapCache
|
||||
import net.corda.node.services.persistence.AbstractPartyDescriptor
|
||||
import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter
|
||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.persistence.DBTransactionMappingStorage
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.persistence.NodePropertiesPersistentStore
|
||||
import net.corda.node.services.network.*
|
||||
import net.corda.node.services.persistence.*
|
||||
import net.corda.node.services.schema.HibernateObserver
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.statemachine.ExternalEvent
|
||||
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
|
||||
import net.corda.node.services.statemachine.SingleThreadedStateMachineManager
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.statemachine.appName
|
||||
import net.corda.node.services.statemachine.flowVersionAndInitiatingClass
|
||||
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
|
||||
import net.corda.node.services.transactions.BFTSMaRt
|
||||
import net.corda.node.services.transactions.RaftNonValidatingNotaryService
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.node.services.transactions.ValidatingNotaryService
|
||||
import net.corda.node.services.statemachine.*
|
||||
import net.corda.node.services.transactions.*
|
||||
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
@ -726,6 +663,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
networkParameters: NetworkParameters): MutableList<Any> {
|
||||
checkpointStorage = DBCheckpointStorage()
|
||||
|
||||
verifyCheckpointsCompatible(checkpointStorage, cordappProvider.cordapps, versionInfo.platformVersion)
|
||||
|
||||
val keyManagementService = makeKeyManagementService(identityService, keyPairs, database)
|
||||
_services = ServiceHubInternalImpl(
|
||||
identityService,
|
||||
|
@ -0,0 +1,71 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.statemachine.SubFlow
|
||||
import net.corda.node.services.statemachine.SubFlowVersion
|
||||
|
||||
object CheckpointVerifier {
|
||||
|
||||
/**
|
||||
* Verifies that all Checkpoints stored in the db can be safely loaded with the currently installed version.
|
||||
* @throws CheckpointIncompatibleException if any offending checkpoint is found.
|
||||
*/
|
||||
fun verifyCheckpointsCompatible(checkpointStorage: CheckpointStorage, currentCordapps: List<Cordapp>, platformVersion: Int) {
|
||||
checkpointStorage.getAllCheckpoints().forEach { (_, serializedCheckpoint) ->
|
||||
val checkpoint = try {
|
||||
serializedCheckpoint.deserialize(context = SerializationDefaults.CHECKPOINT_CONTEXT)
|
||||
} catch (e: Exception) {
|
||||
throw CheckpointIncompatibleException.CannotBeDeserialisedException(e)
|
||||
}
|
||||
|
||||
// For each Subflow, compare the checkpointed version to the current version.
|
||||
checkpoint.subFlowStack.forEach { checkFlowCompatible(it, currentCordapps, platformVersion) }
|
||||
}
|
||||
}
|
||||
|
||||
// Throws exception when the flow is incompatible
|
||||
private fun checkFlowCompatible(subFlow: SubFlow, currentCordapps: List<Cordapp>, platformVersion: Int) {
|
||||
val corDappInfo = subFlow.subFlowVersion
|
||||
|
||||
if (corDappInfo.platformVersion != platformVersion) {
|
||||
throw CheckpointIncompatibleException.SubFlowCoreVersionIncompatibleException(subFlow.flowClass, corDappInfo.platformVersion)
|
||||
}
|
||||
|
||||
if (corDappInfo is SubFlowVersion.CorDappFlow) {
|
||||
val installedCordapps = currentCordapps.filter { it.name == corDappInfo.corDappName }
|
||||
when (installedCordapps.size) {
|
||||
0 -> throw CheckpointIncompatibleException.FlowNotInstalledException(subFlow.flowClass)
|
||||
1 -> {
|
||||
val currenCordapp = installedCordapps.first()
|
||||
if (corDappInfo.corDappHash != currenCordapp.jarHash) {
|
||||
throw CheckpointIncompatibleException.FlowVersionIncompatibleException(subFlow.flowClass, currenCordapp, corDappInfo.corDappHash)
|
||||
}
|
||||
}
|
||||
else -> throw IllegalStateException("Multiple Cordapps with name ${corDappInfo.corDappName} installed.") // This should not happen
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Thrown at startup, if a checkpoint is found that is incompatible with the current code
|
||||
*/
|
||||
sealed class CheckpointIncompatibleException(override val message: String) : Exception() {
|
||||
class CannotBeDeserialisedException(val e: Exception) : CheckpointIncompatibleException(
|
||||
"Found checkpoint that cannot be deserialised using the current Corda version. Please revert to the previous version of Corda, drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again. Cause: ${e.message}")
|
||||
|
||||
class SubFlowCoreVersionIncompatibleException(val flowClass: Class<out FlowLogic<*>>, oldVersion: Int) : CheckpointIncompatibleException(
|
||||
"Found checkpoint for flow: ${flowClass} that is incompatible with the current Corda platform. Please revert to the previous version of Corda (version ${oldVersion}), drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.")
|
||||
|
||||
class FlowVersionIncompatibleException(val flowClass: Class<out FlowLogic<*>>, val cordapp: Cordapp, oldHash: SecureHash) : CheckpointIncompatibleException(
|
||||
"Found checkpoint for flow: ${flowClass} that is incompatible with the current installed version of ${cordapp.name}. Please reinstall the previous version of the CorDapp (with hash: ${oldHash}), drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.")
|
||||
|
||||
class FlowNotInstalledException(val flowClass: Class<out FlowLogic<*>>) : CheckpointIncompatibleException(
|
||||
"Found checkpoint for flow: ${flowClass} that is no longer installed. Please install the missing CorDapp, drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.")
|
||||
}
|
||||
|
@ -128,6 +128,13 @@ open class NodeStartup(val args: Array<String>) {
|
||||
try {
|
||||
cmdlineOptions.baseDirectory.createDirectories()
|
||||
startNode(conf, versionInfo, startTime, cmdlineOptions)
|
||||
} catch (e: CheckpointIncompatibleException) {
|
||||
if (conf.devMode) {
|
||||
Node.printWarning(e.message)
|
||||
} else {
|
||||
logger.error(e.message)
|
||||
return false
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
if (e is Errors.NativeIoException && e.message?.contains("Address already in use") == true) {
|
||||
logger.error("One of the ports required by the Corda node is already in use.")
|
||||
|
@ -4,6 +4,8 @@ import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner
|
||||
import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.cordapp.CordappImpl
|
||||
@ -42,6 +44,17 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
|
||||
val cordapps: List<Cordapp> by lazy { loadCordapps() + coreCordapp }
|
||||
val appClassLoader: ClassLoader = URLClassLoader(cordappJarPaths.stream().map { it.url }.toTypedArray(), javaClass.classLoader)
|
||||
|
||||
// Create a map of the CorDapps that provide a Flow. If a flow is not in this map it is a Core flow.
|
||||
// It also checks that there is only one CorDapp containing that flow class
|
||||
val flowCordappMap: Map<Class<out FlowLogic<*>>, Cordapp> by lazy {
|
||||
cordapps.flatMap { corDapp -> corDapp.allFlows.map { flow -> flow to corDapp } }
|
||||
.groupBy { it.first }
|
||||
.mapValues {
|
||||
require(it.value.size == 1) { "There are multiple CorDapp jars on the classpath for flow ${it.value.first().first.name}: ${it.value.map { it.second.name }.joinToString()}." }
|
||||
it.value.single().second
|
||||
}
|
||||
}
|
||||
|
||||
init {
|
||||
if (cordappJarPaths.isEmpty()) {
|
||||
logger.info("No CorDapp paths provided")
|
||||
@ -121,6 +134,8 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
|
||||
val resource = scanPackage.replace('.', '/')
|
||||
return this::class.java.classLoader.getResources(resource)
|
||||
.asSequence()
|
||||
// This is to only scan classes from test folders.
|
||||
.filter { url -> listOf("main", "production").none { url.toString().contains("$it/$resource") } || listOf("net.corda.core", "net.corda.node", "net.corda.finance").none { scanPackage.startsWith(it) } }
|
||||
.map { url ->
|
||||
if (url.protocol == "jar") {
|
||||
// When running tests from gradle this may be a corda module jar, so restrict to scanPackage:
|
||||
@ -142,16 +157,18 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
|
||||
logger.info("Generating a test-only CorDapp of classes discovered for package $scanPackage in $url: $cordappJar")
|
||||
JarOutputStream(cordappJar.outputStream()).use { jos ->
|
||||
val scanDir = url.toPath()
|
||||
scanDir.walk { it.forEach {
|
||||
val entryPath = "$resource/${scanDir.relativize(it).toString().replace('\\', '/')}"
|
||||
val time = FileTime.from(Instant.EPOCH)
|
||||
val entry = ZipEntry(entryPath).setCreationTime(time).setLastAccessTime(time).setLastModifiedTime(time)
|
||||
jos.putNextEntry(entry)
|
||||
if (it.isRegularFile()) {
|
||||
it.copyTo(jos)
|
||||
scanDir.walk {
|
||||
it.forEach {
|
||||
val entryPath = "$resource/${scanDir.relativize(it).toString().replace('\\', '/')}"
|
||||
val time = FileTime.from(Instant.EPOCH)
|
||||
val entry = ZipEntry(entryPath).setCreationTime(time).setLastAccessTime(time).setLastModifiedTime(time)
|
||||
jos.putNextEntry(entry)
|
||||
if (it.isRegularFile()) {
|
||||
it.copyTo(jos)
|
||||
}
|
||||
jos.closeEntry()
|
||||
}
|
||||
jos.closeEntry()
|
||||
} }
|
||||
}
|
||||
}
|
||||
cordappJar
|
||||
}
|
||||
@ -186,7 +203,9 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
|
||||
serializationWhitelists = listOf(),
|
||||
serializationCustomSerializers = listOf(),
|
||||
customSchemas = setOf(),
|
||||
jarPath = ContractUpgradeFlow.javaClass.protectionDomain.codeSource.location // Core JAR location
|
||||
allFlows = listOf(),
|
||||
jarPath = ContractUpgradeFlow.javaClass.protectionDomain.codeSource.location, // Core JAR location
|
||||
jarHash = SecureHash.allOnesHash
|
||||
)
|
||||
}
|
||||
|
||||
@ -202,10 +221,15 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
|
||||
findPlugins(it),
|
||||
findSerializers(scanResult),
|
||||
findCustomSchemas(scanResult),
|
||||
it.url)
|
||||
findAllFlows(scanResult),
|
||||
it.url,
|
||||
getJarHash(it.url)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private fun getJarHash(url: URL): SecureHash.SHA256 = url.openStream().readFully().sha256()
|
||||
|
||||
private fun findServices(scanResult: RestrictedScanResult): List<Class<out SerializeAsToken>> {
|
||||
return scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class)
|
||||
}
|
||||
@ -241,6 +265,10 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
|
||||
return scanResult.getClassesWithAnnotation(FlowLogic::class, SchedulableFlow::class)
|
||||
}
|
||||
|
||||
private fun findAllFlows(scanResult: RestrictedScanResult): List<Class<out FlowLogic<*>>> {
|
||||
return scanResult.getConcreteClassesOfType(FlowLogic::class)
|
||||
}
|
||||
|
||||
private fun findContractClassNames(scanResult: RestrictedScanResult): List<String> {
|
||||
return coreContractClasses.flatMap { scanResult.getNamesOfClassesImplementing(it) }.distinct()
|
||||
}
|
||||
@ -324,5 +352,12 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
|
||||
.mapNotNull { loadClass(it, type) }
|
||||
.filterNot { Modifier.isAbstract(it.modifiers) }
|
||||
}
|
||||
|
||||
fun <T : Any> getConcreteClassesOfType(type: KClass<T>): List<Class<out T>> {
|
||||
return scanResult.getNamesOfSubclassesOf(type.java)
|
||||
.filter { it.startsWith(qualifiedNamePrefix) }
|
||||
.mapNotNull { loadClass(it, type) }
|
||||
.filterNot { Modifier.isAbstract(it.modifiers) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import net.corda.core.contracts.ContractClassName
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.cordapp.CordappContext
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.internal.DEPLOYED_CORDAPP_UPLOADER
|
||||
import net.corda.core.internal.createCordappContext
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
@ -22,7 +23,6 @@ open class CordappProviderImpl(private val cordappLoader: CordappLoader,
|
||||
private val cordappConfigProvider: CordappConfigProvider,
|
||||
attachmentStorage: AttachmentStorage,
|
||||
private val whitelistedContractImplementations: Map<String, List<AttachmentId>>) : SingletonSerializeAsToken(), CordappProviderInternal {
|
||||
|
||||
companion object {
|
||||
private val log = loggerFor<CordappProviderImpl>()
|
||||
}
|
||||
@ -117,4 +117,6 @@ open class CordappProviderImpl(private val cordappLoader: CordappLoader,
|
||||
* @return cordapp A cordapp or null if no cordapp has the given class loaded
|
||||
*/
|
||||
fun getCordappForClass(className: String): Cordapp? = cordapps.find { it.cordappClasses.contains(className) }
|
||||
|
||||
override fun getCordappForFlow(flowLogic: FlowLogic<*>) = cordappLoader.flowCordappMap[flowLogic.javaClass]
|
||||
}
|
||||
|
@ -2,7 +2,9 @@ package net.corda.node.internal.cordapp
|
||||
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.cordapp.CordappProvider
|
||||
import net.corda.core.flows.FlowLogic
|
||||
|
||||
interface CordappProviderInternal : CordappProvider {
|
||||
val cordapps: List<Cordapp>
|
||||
fun getCordappForFlow(flowLogic: FlowLogic<*>): Cordapp?
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.node.services.api
|
||||
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
@ -31,4 +32,4 @@ interface CheckpointStorage {
|
||||
* underlying database connection is closed, so any processing should happen before it is closed.
|
||||
*/
|
||||
fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, SerializedBytes<Checkpoint>>>
|
||||
}
|
||||
}
|
@ -80,7 +80,7 @@ sealed class Event {
|
||||
*
|
||||
* @param subFlowClass the [Class] of the subflow, to be used to determine whether it's Initiating or inlined.
|
||||
*/
|
||||
data class EnterSubFlow(val subFlowClass: Class<FlowLogic<*>>) : Event()
|
||||
data class EnterSubFlow(val subFlowClass: Class<FlowLogic<*>>, val subFlowVersion: SubFlowVersion ) : Event()
|
||||
|
||||
/**
|
||||
* Signal the leaving of a subflow.
|
||||
|
@ -8,6 +8,7 @@ import co.paralleluniverse.strands.Strand
|
||||
import co.paralleluniverse.strands.channels.Channel
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
@ -46,6 +47,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
*/
|
||||
fun currentStateMachine(): FlowStateMachineImpl<*>? = Strand.currentStrand() as? FlowStateMachineImpl<*>
|
||||
|
||||
// If no CorDapp found then it is a Core flow.
|
||||
internal fun createSubFlowVersion(cordapp: Cordapp?, platformVersion: Int): SubFlowVersion {
|
||||
return cordapp?.let { SubFlowVersion.CorDappFlow(platformVersion, it.name, it.jarHash) }
|
||||
?: SubFlowVersion.CoreFlow(platformVersion)
|
||||
}
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger("net.corda.flow")
|
||||
|
||||
private val SERIALIZER_BLOCKER = Fiber::class.java.getDeclaredField("SERIALIZER_BLOCKER").apply { isAccessible = true }.get(null)
|
||||
@ -99,7 +106,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
if (value) field = value else throw IllegalArgumentException("Can only set to true")
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* Processes an event by creating the associated transition and executing it using the given executor.
|
||||
* Try to avoid using this directly, instead use [processEventsUntilFlowIsResumed] or [processEventImmediately]
|
||||
* instead.
|
||||
@ -237,7 +244,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
@Suspendable
|
||||
override fun <R> subFlow(subFlow: FlowLogic<R>): R {
|
||||
processEventImmediately(
|
||||
Event.EnterSubFlow(subFlow.javaClass),
|
||||
Event.EnterSubFlow(subFlow.javaClass,
|
||||
createSubFlowVersion(
|
||||
serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion)),
|
||||
isDbTransactionOpenOnEntry = true,
|
||||
isDbTransactionOpenOnExit = true
|
||||
)
|
||||
|
@ -8,6 +8,7 @@ import co.paralleluniverse.strands.channels.Channels
|
||||
import com.codahale.metrics.Gauge
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.context.InvocationOrigin
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowInfo
|
||||
import net.corda.core.flows.FlowLogic
|
||||
@ -36,6 +37,8 @@ import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.config.shouldCheckCheckpoints
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.messaging.ReceivedMessage
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
|
||||
import net.corda.node.services.statemachine.interceptors.*
|
||||
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
|
||||
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
|
||||
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
|
||||
@ -529,7 +532,9 @@ class SingleThreadedStateMachineManager(
|
||||
flowLogic.stateMachine = flowStateMachineImpl
|
||||
val frozenFlowLogic = (flowLogic as FlowLogic<*>).serialize(context = checkpointSerializationContext!!)
|
||||
|
||||
val initialCheckpoint = Checkpoint.create(invocationContext, flowStart, flowLogic.javaClass, frozenFlowLogic, ourIdentity, deduplicationSeed).getOrThrow()
|
||||
val flowCorDappVersion= createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion)
|
||||
|
||||
val initialCheckpoint = Checkpoint.create(invocationContext, flowStart, flowLogic.javaClass, frozenFlowLogic, ourIdentity, deduplicationSeed, flowCorDappVersion).getOrThrow()
|
||||
val startedFuture = openFuture<Unit>()
|
||||
val initialState = StateMachineState(
|
||||
checkpoint = initialCheckpoint,
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowInfo
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.identity.Party
|
||||
@ -70,9 +71,10 @@ data class Checkpoint(
|
||||
flowLogicClass: Class<FlowLogic<*>>,
|
||||
frozenFlowLogic: SerializedBytes<FlowLogic<*>>,
|
||||
ourIdentity: Party,
|
||||
deduplicationSeed: String
|
||||
deduplicationSeed: String,
|
||||
subFlowVersion: SubFlowVersion
|
||||
): Try<Checkpoint> {
|
||||
return SubFlow.create(flowLogicClass).map { topLevelSubFlow ->
|
||||
return SubFlow.create(flowLogicClass, subFlowVersion).map { topLevelSubFlow ->
|
||||
Checkpoint(
|
||||
invocationContext = invocationContext,
|
||||
ourIdentity = ourIdentity,
|
||||
@ -231,3 +233,12 @@ sealed class ErrorState {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stored per [SubFlow]. Contains metadata around the version of the code at the Checkpointing moment.
|
||||
*/
|
||||
sealed class SubFlowVersion {
|
||||
abstract val platformVersion: Int
|
||||
data class CoreFlow(override val platformVersion: Int) : SubFlowVersion()
|
||||
data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion()
|
||||
}
|
@ -1,8 +1,6 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.flows.FlowInfo
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.utilities.Try
|
||||
|
||||
/**
|
||||
@ -15,10 +13,13 @@ import net.corda.core.utilities.Try
|
||||
sealed class SubFlow {
|
||||
abstract val flowClass: Class<out FlowLogic<*>>
|
||||
|
||||
// Version of the code.
|
||||
abstract val subFlowVersion: SubFlowVersion
|
||||
|
||||
/**
|
||||
* An inlined subflow.
|
||||
*/
|
||||
data class Inlined(override val flowClass: Class<FlowLogic<*>>) : SubFlow()
|
||||
data class Inlined(override val flowClass: Class<FlowLogic<*>>, override val subFlowVersion: SubFlowVersion) : SubFlow()
|
||||
|
||||
/**
|
||||
* An initiating subflow.
|
||||
@ -30,21 +31,22 @@ sealed class SubFlow {
|
||||
data class Initiating(
|
||||
override val flowClass: Class<FlowLogic<*>>,
|
||||
val classToInitiateWith: Class<in FlowLogic<*>>,
|
||||
val flowInfo: FlowInfo
|
||||
val flowInfo: FlowInfo,
|
||||
override val subFlowVersion: SubFlowVersion
|
||||
) : SubFlow()
|
||||
|
||||
companion object {
|
||||
fun create(flowClass: Class<FlowLogic<*>>): Try<SubFlow> {
|
||||
fun create(flowClass: Class<FlowLogic<*>>, subFlowVersion: SubFlowVersion): Try<SubFlow> {
|
||||
// Are we an InitiatingFlow?
|
||||
val initiatingAnnotations = getInitiatingFlowAnnotations(flowClass)
|
||||
return when (initiatingAnnotations.size) {
|
||||
0 -> {
|
||||
Try.Success(Inlined(flowClass))
|
||||
Try.Success(Inlined(flowClass, subFlowVersion))
|
||||
}
|
||||
1 -> {
|
||||
val initiatingAnnotation = initiatingAnnotations[0]
|
||||
val flowContext = FlowInfo(initiatingAnnotation.second.version, flowClass.appName)
|
||||
Try.Success(Initiating(flowClass, initiatingAnnotation.first, flowContext))
|
||||
Try.Success(Initiating(flowClass, initiatingAnnotation.first, flowContext, subFlowVersion))
|
||||
}
|
||||
else -> {
|
||||
Try.Failure(IllegalArgumentException("${InitiatingFlow::class.java.name} can only be annotated " +
|
||||
|
@ -92,7 +92,7 @@ class TopLevelTransition(
|
||||
|
||||
private fun enterSubFlowTransition(event: Event.EnterSubFlow): TransitionResult {
|
||||
return builder {
|
||||
val subFlow = SubFlow.create(event.subFlowClass)
|
||||
val subFlow = SubFlow.create(event.subFlowClass, event.subFlowVersion)
|
||||
when (subFlow) {
|
||||
is Try.Success -> {
|
||||
currentState = currentState.copy(
|
||||
|
@ -6,9 +6,12 @@ import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.node.internal.CheckpointIncompatibleException
|
||||
import net.corda.node.internal.CheckpointVerifier
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
import net.corda.node.services.statemachine.SubFlowVersion
|
||||
import net.corda.node.services.statemachine.FlowStart
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
@ -18,6 +21,7 @@ import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
@ -34,6 +38,7 @@ class DBCheckpointStorageTests {
|
||||
private companion object {
|
||||
val ALICE = TestIdentity(ALICE_NAME, 70).party
|
||||
}
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
@ -148,19 +153,42 @@ class DBCheckpointStorageTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `verify checkpoints compatible`() {
|
||||
database.transaction {
|
||||
val (id, checkpoint) = newCheckpoint(1)
|
||||
checkpointStorage.addCheckpoint(id, checkpoint)
|
||||
}
|
||||
|
||||
database.transaction {
|
||||
CheckpointVerifier.verifyCheckpointsCompatible(checkpointStorage, emptyList(), 1)
|
||||
}
|
||||
|
||||
database.transaction {
|
||||
val (id1, checkpoint1) = newCheckpoint(2)
|
||||
checkpointStorage.addCheckpoint(id1, checkpoint1)
|
||||
}
|
||||
|
||||
Assertions.assertThatThrownBy {
|
||||
database.transaction {
|
||||
CheckpointVerifier.verifyCheckpointsCompatible(checkpointStorage, emptyList(), 1)
|
||||
}
|
||||
}.isInstanceOf(CheckpointIncompatibleException::class.java)
|
||||
}
|
||||
|
||||
private fun newCheckpointStorage() {
|
||||
database.transaction {
|
||||
checkpointStorage = DBCheckpointStorage()
|
||||
}
|
||||
}
|
||||
|
||||
private fun newCheckpoint(): Pair<StateMachineRunId, SerializedBytes<Checkpoint>> {
|
||||
private fun newCheckpoint(version: Int = 1): Pair<StateMachineRunId, SerializedBytes<Checkpoint>> {
|
||||
val id = StateMachineRunId.createRandom()
|
||||
val logic: FlowLogic<*> = object : FlowLogic<Unit>() {
|
||||
override fun call() {}
|
||||
}
|
||||
val frozenLogic = logic.serialize(context = SerializationDefaults.CHECKPOINT_CONTEXT)
|
||||
val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, ALICE, "").getOrThrow()
|
||||
val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, ALICE, "", SubFlowVersion.CoreFlow(version)).getOrThrow()
|
||||
return id to checkpoint.serialize(context = SerializationDefaults.CHECKPOINT_CONTEXT)
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package net.corda.testing.internal
|
||||
|
||||
import net.corda.core.contracts.ContractClassName
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.internal.TEST_UPLOADER
|
||||
import net.corda.core.internal.cordapp.CordappImpl
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
@ -33,7 +34,9 @@ class MockCordappProvider(
|
||||
serializationWhitelists = emptyList(),
|
||||
serializationCustomSerializers = emptyList(),
|
||||
customSchemas = emptySet(),
|
||||
jarPath = Paths.get("").toUri().toURL())
|
||||
jarPath = Paths.get("").toUri().toURL(),
|
||||
allFlows = emptyList(),
|
||||
jarHash = SecureHash.allOnesHash)
|
||||
if (cordappRegistry.none { it.first.contractClassNames.contains(contractClassName) }) {
|
||||
cordappRegistry.add(Pair(cordapp, findOrImportAttachment(listOf(contractClassName), contractClassName.toByteArray(), attachments)))
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user