mirror of
https://github.com/corda/corda.git
synced 2025-01-29 15:43:55 +00:00
[CORDA-2487] Provide a migration for vault states when moving from V3 to V4.1 (#4699)
* Create a new custom migration for populating the state_party table and the relevancy_status column within the vault_states table when migrating from V3 to V4. * Add a corresponding unit test suite. * Small refactorings to expose data required by the migration: the isRelevant function in the NodeVaultService, and the node's legal name from the configuration.
This commit is contained in:
parent
46188598c4
commit
4e2a295eb6
@ -27,6 +27,9 @@ Unreleased
|
||||
|
||||
The only exception to this is ``Interpolator`` and related classes. These are now in the `IRS demo workflows CorDapp <https://github.com/corda/corda/tree/master/samples/irs-demo/cordapp/workflows-irs>`_.
|
||||
|
||||
* Vault states are now correctly migrated when moving from V3 to V4.1. In particular, this means the relevancy column is correctly filled, and the state party table is populated.
|
||||
Note: This means Corda can be slow to start up for the first time after upgrading from V3 to V4.1.
|
||||
|
||||
.. _changelog_v4.0:
|
||||
|
||||
Version 4.0
|
||||
|
@ -93,7 +93,8 @@ class CordaPersistence(
|
||||
val jdbcUrl: String,
|
||||
cacheFactory: NamedCacheFactory,
|
||||
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet(),
|
||||
customClassLoader: ClassLoader? = null
|
||||
customClassLoader: ClassLoader? = null,
|
||||
val closeConnection: Boolean = true
|
||||
) : Closeable {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
|
@ -96,7 +96,10 @@ class DatabaseTransaction(
|
||||
if (sessionDelegate.isInitialized() && session.isOpen) {
|
||||
session.close()
|
||||
}
|
||||
connection.close()
|
||||
|
||||
if (database.closeConnection) {
|
||||
connection.close()
|
||||
}
|
||||
contextTransactionOrNull = outerTransaction
|
||||
if (outerTransaction == null) {
|
||||
synchronized(this) {
|
||||
|
@ -8,25 +8,34 @@ import liquibase.database.Database
|
||||
import liquibase.database.DatabaseFactory
|
||||
import liquibase.database.jvm.JdbcConnection
|
||||
import liquibase.resource.ClassLoaderResourceAccessor
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.nodeapi.internal.MigrationHelpers.getMigrationResource
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import sun.security.x509.X500Name
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.InputStream
|
||||
import java.nio.file.Path
|
||||
import java.sql.Statement
|
||||
import javax.sql.DataSource
|
||||
|
||||
// Migrate the database to the current version, using liquibase.
|
||||
//
|
||||
// A note on the ourName parameter: This is used by the vault state migration to establish what the node's legal identity is when setting up
|
||||
// its copy of the identity service. It is passed through using a system property. When multiple identity support is added, this will need
|
||||
// reworking so that multiple identities can be passed to the migration.
|
||||
class SchemaMigration(
|
||||
val schemas: Set<MappedSchema>,
|
||||
val dataSource: DataSource,
|
||||
private val databaseConfig: DatabaseConfig,
|
||||
private val classLoader: ClassLoader = Thread.currentThread().contextClassLoader,
|
||||
private val currentDirectory: Path?) {
|
||||
private val currentDirectory: Path?,
|
||||
private val ourName: CordaX500Name? = null) {
|
||||
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
const val NODE_BASE_DIR_KEY = "liquibase.nodeDaseDir"
|
||||
const val NODE_X500_NAME = "liquibase.nodeName"
|
||||
}
|
||||
|
||||
/**
|
||||
@ -93,6 +102,9 @@ class SchemaMigration(
|
||||
if (path != null) {
|
||||
System.setProperty(NODE_BASE_DIR_KEY, path) // base dir for any custom change set which may need to load a file (currently AttachmentVersionNumberMigration)
|
||||
}
|
||||
if (ourName != null) {
|
||||
System.setProperty(NODE_X500_NAME, ourName.toString())
|
||||
}
|
||||
val customResourceAccessor = CustomResourceAccessor(dynamicInclude, changelogList, classLoader)
|
||||
|
||||
val liquibase = Liquibase(dynamicInclude, customResourceAccessor, getLiquibaseDatabase(JdbcConnection(connection)))
|
||||
|
@ -781,7 +781,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
protected open fun startDatabase() {
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
|
||||
database.startHikariPool(props, configuration.database, schemaService.internalSchemas(), metricRegistry, this.cordappLoader.appClassLoader, configuration.baseDirectory)
|
||||
database.startHikariPool(props, configuration.database, schemaService.internalSchemas(), metricRegistry, this.cordappLoader.appClassLoader, configuration.baseDirectory, configuration.myLegalName)
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
logVendorString(database, log)
|
||||
}
|
||||
@ -1093,10 +1093,10 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig,
|
||||
return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, cacheFactory, attributeConverters, customClassLoader)
|
||||
}
|
||||
|
||||
fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>, metricRegistry: MetricRegistry? = null, classloader: ClassLoader = Thread.currentThread().contextClassLoader, currentDir: Path? = null) {
|
||||
fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>, metricRegistry: MetricRegistry? = null, classloader: ClassLoader = Thread.currentThread().contextClassLoader, currentDir: Path? = null, ourName: CordaX500Name? = null) {
|
||||
try {
|
||||
val dataSource = DataSourceFactory.createDataSource(hikariProperties, metricRegistry = metricRegistry)
|
||||
val schemaMigration = SchemaMigration(schemas, dataSource, databaseConfig, classloader, currentDir)
|
||||
val schemaMigration = SchemaMigration(schemas, dataSource, databaseConfig, classloader, currentDir, ourName)
|
||||
schemaMigration.nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })
|
||||
start(dataSource)
|
||||
} catch (ex: Exception) {
|
||||
|
145
node/src/main/kotlin/net/corda/node/migration/CordaMigration.kt
Normal file
145
node/src/main/kotlin/net/corda/node/migration/CordaMigration.kt
Normal file
@ -0,0 +1,145 @@
|
||||
package net.corda.node.migration
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import liquibase.change.custom.CustomTaskChange
|
||||
import liquibase.database.Database
|
||||
import liquibase.database.jvm.JdbcConnection
|
||||
import liquibase.exception.ValidationErrors
|
||||
import liquibase.resource.ResourceAccessor
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.persistence.PublicKeyToTextConverter
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.SchemaMigration.Companion.NODE_X500_NAME
|
||||
import java.io.PrintWriter
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLFeatureNotSupportedException
|
||||
import java.util.logging.Logger
|
||||
import javax.sql.DataSource
|
||||
|
||||
/**
|
||||
* Provide a set of node services for use when migrating items in the database.
|
||||
*
|
||||
* For more complex migrations, information such as the transaction data may need to be extracted from the database. In order to do this,
|
||||
* some node services need to be initialised. This sets up enough of the node services to access items in the database using hibernate
|
||||
* queries.
|
||||
*/
|
||||
abstract class CordaMigration : CustomTaskChange {
|
||||
val identityService: PersistentIdentityService
|
||||
get() = _identityService
|
||||
|
||||
private lateinit var _identityService: PersistentIdentityService
|
||||
|
||||
val cordaDB: CordaPersistence
|
||||
get() = _cordaDB
|
||||
|
||||
private lateinit var _cordaDB: CordaPersistence
|
||||
|
||||
val dbTransactions: WritableTransactionStorage
|
||||
get() = _dbTransactions
|
||||
|
||||
private lateinit var _dbTransactions: WritableTransactionStorage
|
||||
|
||||
/**
|
||||
* Initialise a subset of node services so that data from these can be used to perform migrations.
|
||||
*
|
||||
* This function should not be called unless the NODE_X500_NAME property is set (which should happen
|
||||
* as part of running migrations via the SchemaMigration class).
|
||||
*/
|
||||
fun initialiseNodeServices(database: Database,
|
||||
schema: Set<MappedSchema>) {
|
||||
val url = (database.connection as JdbcConnection).url
|
||||
val dataSource = MigrationDataSource(database)
|
||||
val metricRegistry = MetricRegistry()
|
||||
val cacheFactory = MigrationNamedCacheFactory(metricRegistry, null)
|
||||
_identityService = PersistentIdentityService(cacheFactory)
|
||||
_cordaDB = createDatabase(url, cacheFactory, identityService, schema)
|
||||
cordaDB.start(dataSource)
|
||||
identityService.database = cordaDB
|
||||
val ourName = CordaX500Name.parse(System.getProperty(NODE_X500_NAME))
|
||||
|
||||
cordaDB.transaction {
|
||||
identityService.ourNames = setOf(ourName)
|
||||
_dbTransactions = DBTransactionStorage(cordaDB, cacheFactory)
|
||||
}
|
||||
}
|
||||
|
||||
private fun createDatabase(jdbcUrl: String,
|
||||
cacheFactory: MigrationNamedCacheFactory,
|
||||
identityService: PersistentIdentityService,
|
||||
schema: Set<MappedSchema>): CordaPersistence {
|
||||
val configDefaults = DatabaseConfig()
|
||||
val attributeConverters = listOf(
|
||||
PublicKeyToTextConverter(),
|
||||
AbstractPartyToX500NameAsStringConverter(
|
||||
identityService::wellKnownPartyFromX500Name,
|
||||
identityService::wellKnownPartyFromAnonymous)
|
||||
)
|
||||
// Liquibase handles closing the database connection when migrations are finished. If the connection is closed here, then further
|
||||
// migrations may fail.
|
||||
return CordaPersistence(configDefaults, schema, jdbcUrl, cacheFactory, attributeConverters, closeConnection = false)
|
||||
}
|
||||
|
||||
override fun validate(database: Database?): ValidationErrors? {
|
||||
return null
|
||||
}
|
||||
|
||||
override fun setUp() {
|
||||
}
|
||||
|
||||
override fun setFileOpener(resourceAccessor: ResourceAccessor?) {
|
||||
}
|
||||
|
||||
override fun getConfirmationMessage(): String? {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap the liquibase database as a DataSource, so it can be used with a CordaPersistence instance
|
||||
*/
|
||||
class MigrationDataSource(val database: Database) : DataSource {
|
||||
override fun getConnection(): Connection {
|
||||
return (database.connection as JdbcConnection).wrappedConnection
|
||||
}
|
||||
|
||||
override fun getConnection(username: String?, password: String?): Connection {
|
||||
return this.connection
|
||||
}
|
||||
|
||||
private var _loginTimeout: Int = 0
|
||||
|
||||
|
||||
override fun setLoginTimeout(seconds: Int) {
|
||||
_loginTimeout = seconds
|
||||
}
|
||||
|
||||
override fun getLoginTimeout(): Int {
|
||||
return _loginTimeout
|
||||
}
|
||||
|
||||
override fun setLogWriter(out: PrintWriter?) {
|
||||
// No implementation required.
|
||||
}
|
||||
|
||||
override fun isWrapperFor(iface: Class<*>?): Boolean {
|
||||
return this.connection.isWrapperFor(iface)
|
||||
}
|
||||
|
||||
override fun getLogWriter(): PrintWriter? {
|
||||
return null
|
||||
}
|
||||
|
||||
override fun <T : Any?> unwrap(iface: Class<T>?): T {
|
||||
return this.connection.unwrap(iface)
|
||||
}
|
||||
|
||||
override fun getParentLogger(): Logger {
|
||||
throw SQLFeatureNotSupportedException()
|
||||
}
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
package net.corda.node.migration
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.utilities.BindableNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import java.lang.IllegalArgumentException
|
||||
|
||||
// A cache factory suitable for use while migrating the database to a new version. This version does not need node configuration in order to
|
||||
// construct a cache.
|
||||
class MigrationNamedCacheFactory(private val metricRegistry: MetricRegistry?,
|
||||
private val nodeConfiguration: NodeConfiguration?) : BindableNamedCacheFactory, SingletonSerializeAsToken() {
|
||||
|
||||
override fun bindWithMetrics(metricRegistry: MetricRegistry) = MigrationNamedCacheFactory(metricRegistry, this.nodeConfiguration)
|
||||
override fun bindWithConfig(nodeConfiguration: NodeConfiguration) = MigrationNamedCacheFactory(this.metricRegistry, nodeConfiguration)
|
||||
|
||||
private fun <K, V> configuredForNamed(caffeine: Caffeine<K, V>, name: String): Caffeine<K, V> {
|
||||
return when(name) {
|
||||
"HibernateConfiguration_sessionFactories" -> caffeine.maximumSize(
|
||||
nodeConfiguration?.database?.mappedSchemaCacheSize ?: DatabaseConfig.Defaults.mappedSchemaCacheSize
|
||||
)
|
||||
"DBTransactionStorage_transactions" -> caffeine.maximumWeight(
|
||||
nodeConfiguration?.transactionCacheSizeBytes ?: NodeConfiguration.defaultTransactionCacheSize
|
||||
)
|
||||
"PersistentIdentityService_partyByKey" -> caffeine.maximumSize(defaultCacheSize)
|
||||
"PersistentIdentityService_partyByName" -> caffeine.maximumSize(defaultCacheSize)
|
||||
"BasicHSMKeyManagementService_keys" -> caffeine.maximumSize(defaultCacheSize)
|
||||
else -> throw IllegalArgumentException("Unexpected cache name $name.")
|
||||
}
|
||||
}
|
||||
|
||||
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String): Cache<K, V> {
|
||||
return configuredForNamed(caffeine, name).build()
|
||||
}
|
||||
|
||||
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
|
||||
return configuredForNamed(caffeine, name).build(loader)
|
||||
}
|
||||
|
||||
private val defaultCacheSize = 1024L
|
||||
}
|
@ -0,0 +1,312 @@
|
||||
package net.corda.node.migration
|
||||
|
||||
import liquibase.database.Database
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.internal.*
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.keys.BasicHSMKeyManagementService
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.services.vault.VaultSchemaV1
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import net.corda.serialization.internal.AMQP_P2P_CONTEXT
|
||||
import net.corda.serialization.internal.AMQP_STORAGE_CONTEXT
|
||||
import net.corda.serialization.internal.CordaSerializationMagic
|
||||
import net.corda.serialization.internal.SerializationFactoryImpl
|
||||
import net.corda.serialization.internal.amqp.AbstractAMQPSerializationScheme
|
||||
import net.corda.serialization.internal.amqp.amqpMagic
|
||||
import org.hibernate.Session
|
||||
import org.hibernate.query.Query
|
||||
import java.util.concurrent.ForkJoinPool
|
||||
import java.util.concurrent.ForkJoinTask
|
||||
import java.util.concurrent.RecursiveAction
|
||||
import javax.persistence.criteria.Root
|
||||
import javax.persistence.criteria.Selection
|
||||
|
||||
class VaultStateMigration : CordaMigration() {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
private fun addStateParties(session: Session, stateAndRef: StateAndRef<ContractState>) {
|
||||
val state = stateAndRef.state.data
|
||||
val persistentStateRef = PersistentStateRef(stateAndRef.ref)
|
||||
try {
|
||||
state.participants.groupBy { it.owningKey }.forEach { participants ->
|
||||
val persistentParty = VaultSchemaV1.PersistentParty(persistentStateRef, participants.value.first())
|
||||
session.persist(persistentParty)
|
||||
}
|
||||
} catch (e: AbstractMethodError) {
|
||||
throw VaultStateMigrationException("Cannot add state parties as state class is not on the classpath " +
|
||||
"and participants cannot be synthesised")
|
||||
}
|
||||
}
|
||||
|
||||
private fun getStateAndRef(persistentState: VaultSchemaV1.VaultStates): StateAndRef<ContractState> {
|
||||
val persistentStateRef = persistentState.stateRef ?:
|
||||
throw VaultStateMigrationException("Persistent state ref missing from state")
|
||||
val txHash = SecureHash.parse(persistentStateRef.txId)
|
||||
val tx = dbTransactions.getTransaction(txHash) ?:
|
||||
throw VaultStateMigrationException("Transaction $txHash not present in vault")
|
||||
val state = tx.coreTransaction.outputs[persistentStateRef.index]
|
||||
val stateRef = StateRef(txHash, persistentStateRef.index)
|
||||
return StateAndRef(state, stateRef)
|
||||
}
|
||||
|
||||
override fun execute(database: Database?) {
|
||||
logger.info("Migrating vault state data to V4 tables")
|
||||
if (database == null) {
|
||||
logger.warn("Cannot migrate vault states: Liquibase failed to provide a suitable database connection")
|
||||
return
|
||||
}
|
||||
initialiseNodeServices(database, setOf(VaultMigrationSchemaV1, VaultSchemaV1))
|
||||
|
||||
val persistentStates = VaultStateIterator(cordaDB)
|
||||
VaultStateIterator.withSerializationEnv {
|
||||
persistentStates.forEach {
|
||||
val session = currentDBSession()
|
||||
try {
|
||||
val stateAndRef = getStateAndRef(it)
|
||||
|
||||
addStateParties(session, stateAndRef)
|
||||
|
||||
// Can get away without checking for AbstractMethodErrors here as these will have already occurred when trying to add
|
||||
// state parties.
|
||||
val myKeys = identityService.stripNotOurKeys(stateAndRef.state.data.participants.map { participant ->
|
||||
participant.owningKey
|
||||
}).toSet()
|
||||
if (!NodeVaultService.isRelevant(stateAndRef.state.data, myKeys)) {
|
||||
it.relevancyStatus = Vault.RelevancyStatus.NOT_RELEVANT
|
||||
}
|
||||
} catch (e: VaultStateMigrationException) {
|
||||
logger.warn("An error occurred while migrating a vault state: ${e.message}. Skipping")
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info("Finished performing vault state data migration for ${persistentStates.numStates} states")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A minimal set of schema for retrieving data from the database.
|
||||
*
|
||||
* Note that adding an extra schema here may cause migrations to fail if it ends up creating a table before the same table
|
||||
* is created in a migration script. As such, this migration must be run after the tables for the following have been created (and,
|
||||
* if they are removed in the future, before they are deleted).
|
||||
*/
|
||||
object VaultMigrationSchema
|
||||
|
||||
object VaultMigrationSchemaV1 : MappedSchema(schemaFamily = VaultMigrationSchema.javaClass, version = 1,
|
||||
mappedTypes = listOf(
|
||||
DBTransactionStorage.DBTransaction::class.java,
|
||||
PersistentIdentityService.PersistentIdentity::class.java,
|
||||
PersistentIdentityService.PersistentIdentityNames::class.java,
|
||||
BasicHSMKeyManagementService.PersistentKey::class.java
|
||||
)
|
||||
)
|
||||
|
||||
/**
|
||||
* Provides a mechanism for iterating through all persistent vault states.
|
||||
*
|
||||
* This class ensures that changes to persistent states are periodically committed and flushed. This prevents out of memory issues when
|
||||
* there are a large number of states.
|
||||
*
|
||||
* Currently, this class filters out those persistent states that have entries in the state party table. This behaviour is required for the
|
||||
* vault state migration, as entries in this table should not be duplicated. Unconsumed states are also filtered out for performance.
|
||||
*/
|
||||
class VaultStateIterator(private val database: CordaPersistence) : Iterator<VaultSchemaV1.VaultStates> {
|
||||
companion object {
|
||||
val logger = contextLogger()
|
||||
|
||||
private object AMQPInspectorSerializationScheme : AbstractAMQPSerializationScheme(emptyList()) {
|
||||
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
||||
return magic == amqpMagic
|
||||
}
|
||||
|
||||
override fun rpcClientSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException()
|
||||
override fun rpcServerSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException()
|
||||
}
|
||||
|
||||
private fun initialiseSerialization() {
|
||||
// Deserialise with the lenient carpenter as we only care for the AMQP field getters
|
||||
_inheritableContextSerializationEnv.set(SerializationEnvironment.with(
|
||||
SerializationFactoryImpl().apply {
|
||||
registerScheme(AMQPInspectorSerializationScheme)
|
||||
},
|
||||
p2pContext = AMQP_P2P_CONTEXT.withLenientCarpenter(),
|
||||
storageContext = AMQP_STORAGE_CONTEXT.withLenientCarpenter()
|
||||
))
|
||||
}
|
||||
|
||||
private fun disableSerialization() {
|
||||
_inheritableContextSerializationEnv.set(null)
|
||||
}
|
||||
|
||||
fun withSerializationEnv(block: () -> Unit) {
|
||||
val newEnv = if (_allEnabledSerializationEnvs.isEmpty()) {
|
||||
initialiseSerialization()
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
effectiveSerializationEnv.serializationFactory.withCurrentContext(effectiveSerializationEnv.storageContext.withLenientCarpenter()) {
|
||||
block()
|
||||
}
|
||||
|
||||
if (newEnv) {
|
||||
disableSerialization()
|
||||
}
|
||||
}
|
||||
}
|
||||
private val criteriaBuilder = database.entityManagerFactory.criteriaBuilder
|
||||
val numStates = getTotalStates()
|
||||
|
||||
// Create a query on the vault states that does the following filtering:
|
||||
// - Returns only those states without corresponding entries in the state_party table
|
||||
// - Returns only unconsumed states (for performance reasons)
|
||||
private fun <T>createVaultStatesQuery(returnClass: Class<T>, selection: (Root<VaultSchemaV1.VaultStates>) -> Selection<T>): Query<T> {
|
||||
val session = currentDBSession()
|
||||
val criteriaQuery = criteriaBuilder.createQuery(returnClass)
|
||||
val queryRootStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val subQuery = criteriaQuery.subquery(Long::class.java)
|
||||
val subRoot = subQuery.from(VaultSchemaV1.PersistentParty::class.java)
|
||||
subQuery.select(criteriaBuilder.count(subRoot))
|
||||
subQuery.where(criteriaBuilder.equal(
|
||||
subRoot.get<VaultSchemaV1.PersistentStateRefAndKey>(VaultSchemaV1.PersistentParty::compositeKey.name)
|
||||
.get<PersistentStateRef>(VaultSchemaV1.PersistentStateRefAndKey::stateRef.name),
|
||||
queryRootStates.get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name)))
|
||||
criteriaQuery.select(selection(queryRootStates))
|
||||
criteriaQuery.where(criteriaBuilder.and(
|
||||
criteriaBuilder.equal(subQuery, 0),
|
||||
criteriaBuilder.equal(queryRootStates.get<Vault.StateStatus>(VaultSchemaV1.VaultStates::stateStatus.name),
|
||||
Vault.StateStatus.UNCONSUMED)))
|
||||
return session.createQuery(criteriaQuery)
|
||||
}
|
||||
|
||||
private fun getTotalStates(): Long {
|
||||
return database.transaction {
|
||||
val query = createVaultStatesQuery(Long::class.java, criteriaBuilder::count)
|
||||
val result = query.singleResult
|
||||
logger.debug("Found $result total states in the vault")
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
private val pageSize = 1000
|
||||
private var pageNumber = 0
|
||||
private var transaction: DatabaseTransaction? = null
|
||||
private var currentPage = getNextPage()
|
||||
|
||||
private fun endTransaction() {
|
||||
try {
|
||||
transaction?.commit()
|
||||
} catch (e: Exception) {
|
||||
transaction?.rollback()
|
||||
logger.error("Failed to commit transaction while iterating vault states: ${e.message}", e)
|
||||
} finally {
|
||||
transaction?.close()
|
||||
}
|
||||
}
|
||||
|
||||
private fun getNextPage(): List<VaultSchemaV1.VaultStates> {
|
||||
endTransaction()
|
||||
transaction = database.newTransaction()
|
||||
val query = createVaultStatesQuery(VaultSchemaV1.VaultStates::class.java) { it }
|
||||
// The above query excludes states that have entries in the state party table. As the iteration proceeds, each state has entries
|
||||
// added to this table. The result is that when the next page is retrieved, any results that were in the previous page are not in
|
||||
// the query at all! As such, the next set of states that need processing start at the first result.
|
||||
query.firstResult = 0
|
||||
query.maxResults = pageSize
|
||||
pageNumber++
|
||||
val result = query.resultList
|
||||
logger.debug("Loaded page $pageNumber of ${(numStates - 1 / pageNumber.toLong()) + 1}. Current page has ${result.size} vault states")
|
||||
return result
|
||||
}
|
||||
|
||||
private var currentIndex = 0
|
||||
|
||||
override fun hasNext(): Boolean {
|
||||
val nextElementPresent = currentIndex + ((pageNumber - 1) * pageSize) < numStates
|
||||
if (!nextElementPresent) {
|
||||
endTransaction()
|
||||
}
|
||||
return nextElementPresent
|
||||
}
|
||||
|
||||
override fun next(): VaultSchemaV1.VaultStates {
|
||||
if (currentIndex == pageSize) {
|
||||
currentPage = getNextPage()
|
||||
currentIndex = 0
|
||||
}
|
||||
val stateToReturn = currentPage[currentIndex]
|
||||
currentIndex++
|
||||
return stateToReturn
|
||||
}
|
||||
|
||||
// The rest of this class is an attempt at multithreading that was ultimately scuppered by liquibase not providing a connection pool.
|
||||
// This may be useful as a starting point for improving performance of the migration, so is left here. To start using it, remove the
|
||||
// serialization environment changes in the execute function in the migration, and change forEach -> parallelForEach.
|
||||
private val pool = ForkJoinPool.commonPool()
|
||||
|
||||
private class VaultPageTask(val database: CordaPersistence,
|
||||
val page: List<VaultSchemaV1.VaultStates>,
|
||||
val block: (VaultSchemaV1.VaultStates) -> Unit): RecursiveAction() {
|
||||
|
||||
private val pageSize = page.size
|
||||
private val tolerance = 10
|
||||
|
||||
override fun compute() {
|
||||
withSerializationEnv {
|
||||
if (pageSize > tolerance) {
|
||||
ForkJoinTask.invokeAll(createSubtasks())
|
||||
} else {
|
||||
applyBlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun createSubtasks(): List<VaultPageTask> {
|
||||
return listOf(VaultPageTask(database, page.subList(0, pageSize / 2), block), VaultPageTask(database, page.subList(pageSize / 2, pageSize), block))
|
||||
}
|
||||
|
||||
private fun applyBlock() {
|
||||
effectiveSerializationEnv.serializationFactory.withCurrentContext(effectiveSerializationEnv.storageContext.withLenientCarpenter()) {
|
||||
database.transaction {
|
||||
page.forEach { block(it) }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun hasNextPage(): Boolean {
|
||||
val nextPagePresent = pageNumber * pageSize < numStates
|
||||
if (!nextPagePresent) {
|
||||
endTransaction()
|
||||
}
|
||||
return nextPagePresent
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterate through all states in the vault, parallelizing the work on each page of vault states.
|
||||
*/
|
||||
fun parallelForEach(block: (VaultSchemaV1.VaultStates) -> Unit) {
|
||||
pool.invoke(VaultPageTask(database, currentPage, block))
|
||||
while (hasNextPage()) {
|
||||
currentPage = getNextPage()
|
||||
pool.invoke(VaultPageTask(database, currentPage, block))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class VaultStateMigrationException(msg: String) : Exception(msg)
|
@ -87,7 +87,7 @@ interface NodeConfiguration {
|
||||
|
||||
companion object {
|
||||
// default to at least 8MB and a bit extra for larger heap sizes
|
||||
internal val defaultTransactionCacheSize: Long = 8.MB + getAdditionalCacheMemory()
|
||||
val defaultTransactionCacheSize: Long = 8.MB + getAdditionalCacheMemory()
|
||||
|
||||
internal val DEFAULT_FLOW_MONITOR_PERIOD_MILLIS: Duration = Duration.ofMinutes(1)
|
||||
internal val DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS: Duration = Duration.ofMinutes(1)
|
||||
|
@ -68,7 +68,7 @@ class BasicHSMKeyManagementService(cacheFactory: NamedCacheFactory, val identity
|
||||
: this(null, accountId, publicKey.toStringShort())
|
||||
}
|
||||
|
||||
private companion object {
|
||||
companion object {
|
||||
fun createKeyMap(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap<PublicKey, PrivateKey, PersistentKey, String> {
|
||||
return AppendOnlyPersistentMap(
|
||||
cacheFactory = cacheFactory,
|
||||
|
@ -10,6 +10,7 @@ import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.serialization.internal.effectiveSerializationEnv
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
@ -50,6 +51,14 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
)
|
||||
|
||||
private companion object {
|
||||
private fun contextToUse(): SerializationContext {
|
||||
return if (effectiveSerializationEnv.serializationFactory.currentContext?.useCase == SerializationContext.UseCase.Storage) {
|
||||
effectiveSerializationEnv.serializationFactory.currentContext!!
|
||||
} else {
|
||||
SerializationDefaults.STORAGE_CONTEXT
|
||||
}
|
||||
}
|
||||
|
||||
fun createTransactionsMap(cacheFactory: NamedCacheFactory)
|
||||
: AppendOnlyPersistentMapBase<SecureHash, TxCacheValue, DBTransaction, String> {
|
||||
return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>(
|
||||
@ -58,14 +67,14 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
toPersistentEntityKey = { it.toString() },
|
||||
fromPersistentEntity = {
|
||||
Pair(SecureHash.parse(it.txId),
|
||||
it.transaction.deserialize<SignedTransaction>(context = SerializationDefaults.STORAGE_CONTEXT)
|
||||
it.transaction.deserialize<SignedTransaction>(context = contextToUse())
|
||||
.toTxCacheValue())
|
||||
},
|
||||
toPersistentEntity = { key: SecureHash, value: TxCacheValue ->
|
||||
DBTransaction().apply {
|
||||
txId = key.toString()
|
||||
stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id?.uuid?.toString()
|
||||
transaction = value.toSignedTx().serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
|
||||
transaction = value.toSignedTx().serialize(context = contextToUse()).bytes
|
||||
}
|
||||
},
|
||||
persistentEntityClass = DBTransaction::class.java,
|
||||
|
@ -60,8 +60,24 @@ class NodeVaultService(
|
||||
private val schemaService: SchemaService,
|
||||
private val appClassloader: ClassLoader
|
||||
) : SingletonSerializeAsToken(), VaultServiceInternal {
|
||||
private companion object {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
|
||||
/**
|
||||
* Establish whether a given state is relevant to a node, given the node's public keys.
|
||||
*
|
||||
* A state is relevant if any of the participants (or the owner for ownable states) has an owning key matching one of this node's
|
||||
* public keys.
|
||||
*/
|
||||
fun isRelevant(state: ContractState, myKeys: Set<PublicKey>): Boolean {
|
||||
val keysToCheck = when (state) {
|
||||
// Sometimes developers forget to add the owning key to participants for OwnableStates.
|
||||
// TODO: This logic should probably be moved to OwnableState so we can just do a simple intersection here.
|
||||
is OwnableState -> (state.participants.map { it.owningKey } + state.owner.owningKey).toSet()
|
||||
else -> state.participants.map { it.owningKey }
|
||||
}
|
||||
return keysToCheck.any { it.containsAny(myKeys) }
|
||||
}
|
||||
}
|
||||
|
||||
private class InnerState {
|
||||
@ -493,17 +509,6 @@ class NodeVaultService(
|
||||
return claimedStates
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
internal fun isRelevant(state: ContractState, myKeys: Set<PublicKey>): Boolean {
|
||||
val keysToCheck = when (state) {
|
||||
// Sometimes developers forget to add the owning key to participants for OwnableStates.
|
||||
// TODO: This logic should probably be moved to OwnableState so we can just do a simple intersection here.
|
||||
is OwnableState -> (state.participants.map { it.owningKey } + state.owner.owningKey).toSet()
|
||||
else -> state.participants.map { it.owningKey }
|
||||
}
|
||||
return keysToCheck.any { it.containsAny(myKeys) }
|
||||
}
|
||||
|
||||
@Throws(VaultQueryException::class)
|
||||
override fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): Vault.Page<T> {
|
||||
return _queryBy(criteria, paging, sorting, contractStateType, false)
|
||||
|
@ -14,4 +14,6 @@
|
||||
<include file="migration/node-core.changelog-v9.xml"/>
|
||||
<include file="migration/node-core.changelog-v10.xml"/>
|
||||
<include file="migration/node-core.changelog-v11.xml"/>
|
||||
<!-- This must run after node-core.changelog-init.xml, to prevent database columns being created twice. -->
|
||||
<include file="migration/vault-schema.changelog-v9.xml"/>
|
||||
</databaseChangeLog>
|
||||
|
@ -0,0 +1,9 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||
|
||||
<changeSet author="R3.Corda" id="update-vault-states">
|
||||
<customChange class="net.corda.node.migration.VaultStateMigration"/>
|
||||
</changeSet>
|
||||
</databaseChangeLog>
|
@ -0,0 +1,497 @@
|
||||
package net.corda.node.migration
|
||||
|
||||
import liquibase.database.Database
|
||||
import liquibase.database.jvm.JdbcConnection
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.hash
|
||||
import net.corda.core.internal.packageName
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.contracts.Commodity
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.contracts.asset.Obligation
|
||||
import net.corda.finance.contracts.asset.OnLedgerAsset
|
||||
import net.corda.finance.schemas.CashSchemaV1
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.keys.BasicHSMKeyManagementService
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.vault.VaultSchemaV1
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.internal.configureDatabase
|
||||
import net.corda.testing.internal.vault.CommodityState
|
||||
import net.corda.testing.internal.vault.DUMMY_LINEAR_CONTRACT_PROGRAM_ID
|
||||
import net.corda.testing.internal.vault.DummyLinearContract
|
||||
import net.corda.testing.internal.vault.DummyLinearStateSchemaV1
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.TestClock
|
||||
import net.corda.testing.node.makeTestIdentityService
|
||||
import org.junit.*
|
||||
import org.mockito.Mockito
|
||||
import java.security.KeyPair
|
||||
import java.time.Clock
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFalse
|
||||
|
||||
class VaultStateMigrationTest {
|
||||
companion object {
|
||||
val alice = TestIdentity(ALICE_NAME, 70)
|
||||
val bankOfCorda = TestIdentity(BOC_NAME)
|
||||
val bob = TestIdentity(BOB_NAME, 80)
|
||||
private val charlie = TestIdentity(CHARLIE_NAME, 90)
|
||||
val dummyCashIssuer = TestIdentity(CordaX500Name("Snake Oil Issuer", "London", "GB"), 10)
|
||||
val dummyNotary = TestIdentity(DUMMY_NOTARY_NAME, 20)
|
||||
val ALICE get() = alice.party
|
||||
val ALICE_IDENTITY get() = alice.identity
|
||||
val BOB get() = bob.party
|
||||
val BOB_IDENTITY get() = bob.identity
|
||||
val BOC_IDENTITY get() = bankOfCorda.identity
|
||||
val BOC_KEY get() = bankOfCorda.keyPair
|
||||
val CHARLIE get() = charlie.party
|
||||
val DUMMY_NOTARY get() = dummyNotary.party
|
||||
val bob2 = TestIdentity(BOB_NAME, 40)
|
||||
val BOB2 = bob2.party
|
||||
val BOB2_IDENTITY = bob2.identity
|
||||
|
||||
val clock: TestClock = TestClock(Clock.systemUTC())
|
||||
|
||||
@ClassRule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
val logger = contextLogger()
|
||||
}
|
||||
|
||||
val cordappPackages = listOf(
|
||||
"net.corda.finance.contracts",
|
||||
CashSchemaV1::class.packageName,
|
||||
DummyLinearStateSchemaV1::class.packageName)
|
||||
|
||||
lateinit var liquibaseDB: Database
|
||||
lateinit var cordaDB: CordaPersistence
|
||||
lateinit var notaryServices: MockServices
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val identityService = makeTestIdentityService(dummyNotary.identity, BOB_IDENTITY, ALICE_IDENTITY)
|
||||
notaryServices = MockServices(cordappPackages, dummyNotary, identityService, dummyCashIssuer.keyPair, BOC_KEY)
|
||||
cordaDB = configureDatabase(
|
||||
makeTestDataSourceProperties(),
|
||||
DatabaseConfig(),
|
||||
notaryServices.identityService::wellKnownPartyFromX500Name,
|
||||
notaryServices.identityService::wellKnownPartyFromAnonymous,
|
||||
ourName = BOB_IDENTITY.name)
|
||||
val liquibaseConnection = Mockito.mock(JdbcConnection::class.java)
|
||||
Mockito.`when`(liquibaseConnection.url).thenReturn(cordaDB.jdbcUrl)
|
||||
Mockito.`when`(liquibaseConnection.wrappedConnection).thenReturn(cordaDB.dataSource.connection)
|
||||
liquibaseDB = Mockito.mock(Database::class.java)
|
||||
Mockito.`when`(liquibaseDB.connection).thenReturn(liquibaseConnection)
|
||||
|
||||
saveOurKeys(listOf(bob.keyPair, bob2.keyPair))
|
||||
saveAllIdentities(listOf(BOB_IDENTITY, ALICE_IDENTITY, BOC_IDENTITY, dummyNotary.identity, BOB2_IDENTITY))
|
||||
}
|
||||
|
||||
@After
|
||||
fun close() {
|
||||
cordaDB.close()
|
||||
}
|
||||
|
||||
private fun createCashTransaction(cash: Cash, value: Amount<Currency>, owner: AbstractParty): SignedTransaction {
|
||||
val tx = TransactionBuilder(DUMMY_NOTARY)
|
||||
cash.generateIssue(tx, Amount(value.quantity, Issued(bankOfCorda.ref(1), value.token)), owner, DUMMY_NOTARY)
|
||||
return notaryServices.signInitialTransaction(tx, bankOfCorda.party.owningKey)
|
||||
}
|
||||
|
||||
private fun createVaultStatesFromTransaction(tx: SignedTransaction, stateStatus: Vault.StateStatus = Vault.StateStatus.UNCONSUMED) {
|
||||
cordaDB.transaction {
|
||||
tx.coreTransaction.outputs.forEachIndexed { index, state ->
|
||||
val constraintInfo = Vault.ConstraintInfo(state.constraint)
|
||||
val persistentState = VaultSchemaV1.VaultStates(
|
||||
notary = state.notary,
|
||||
contractStateClassName = state.data.javaClass.name,
|
||||
stateStatus = stateStatus,
|
||||
recordedTime = clock.instant(),
|
||||
relevancyStatus = Vault.RelevancyStatus.RELEVANT, //Always persist as relevant to mimic V3
|
||||
constraintType = constraintInfo.type(),
|
||||
constraintData = constraintInfo.data()
|
||||
)
|
||||
persistentState.stateRef = PersistentStateRef(tx.id.toString(), index)
|
||||
session.save(persistentState)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun saveOurKeys(keys: List<KeyPair>) {
|
||||
cordaDB.transaction {
|
||||
keys.forEach {
|
||||
val persistentKey = BasicHSMKeyManagementService.PersistentKey(it.public, it.private)
|
||||
session.save(persistentKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun saveAllIdentities(identities: List<PartyAndCertificate>) {
|
||||
cordaDB.transaction {
|
||||
identities.groupBy { it.name }.forEach { name, certs ->
|
||||
val persistentIDs = certs.map { PersistentIdentityService.PersistentIdentity(it.owningKey.hash.toString(), it.certPath.encoded) }
|
||||
val persistentName = PersistentIdentityService.PersistentIdentityNames(name.toString(), certs.first().owningKey.hash.toString())
|
||||
persistentIDs.forEach { session.save(it) }
|
||||
session.save(persistentName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun storeTransaction(tx: SignedTransaction) {
|
||||
cordaDB.transaction {
|
||||
val persistentTx = DBTransactionStorage.DBTransaction(
|
||||
txId = tx.id.toString(),
|
||||
stateMachineRunId = null,
|
||||
transaction = tx.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
|
||||
)
|
||||
session.save(persistentTx)
|
||||
}
|
||||
}
|
||||
|
||||
private fun getVaultStateCount(relevancyStatus: Vault.RelevancyStatus = Vault.RelevancyStatus.ALL): Long {
|
||||
return cordaDB.transaction {
|
||||
val criteriaBuilder = cordaDB.entityManagerFactory.criteriaBuilder
|
||||
val criteriaQuery = criteriaBuilder.createQuery(Long::class.java)
|
||||
val queryRootStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
criteriaQuery.select(criteriaBuilder.count(queryRootStates))
|
||||
if (relevancyStatus != Vault.RelevancyStatus.ALL) {
|
||||
criteriaQuery.where(criteriaBuilder.equal(queryRootStates.get<Vault.RelevancyStatus>("relevancyStatus"), relevancyStatus))
|
||||
}
|
||||
val query = session.createQuery(criteriaQuery)
|
||||
query.singleResult
|
||||
}
|
||||
}
|
||||
|
||||
private fun getStatePartyCount(): Long {
|
||||
return cordaDB.transaction {
|
||||
val criteriaBuilder = cordaDB.entityManagerFactory.criteriaBuilder
|
||||
val criteriaQuery = criteriaBuilder.createQuery(Long::class.java)
|
||||
val queryRootStates = criteriaQuery.from(VaultSchemaV1.PersistentParty::class.java)
|
||||
criteriaQuery.select(criteriaBuilder.count(queryRootStates))
|
||||
val query = session.createQuery(criteriaQuery)
|
||||
query.singleResult
|
||||
}
|
||||
}
|
||||
|
||||
private fun addCashStates(statesToAdd: Int, owner: AbstractParty, stateStatus: Vault.StateStatus = Vault.StateStatus.UNCONSUMED) {
|
||||
val cash = Cash()
|
||||
cordaDB.transaction {
|
||||
(1..statesToAdd).map { createCashTransaction(cash, it.DOLLARS, owner) }.forEach {
|
||||
storeTransaction(it)
|
||||
createVaultStatesFromTransaction(it, stateStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun createLinearStateTransaction(idString: String,
|
||||
parties: List<AbstractParty> = listOf(),
|
||||
linearString: String = "foo",
|
||||
linearNumber: Long = 0L,
|
||||
linearBoolean: Boolean = false): SignedTransaction {
|
||||
val tx = TransactionBuilder(notary = dummyNotary.party).apply {
|
||||
addOutputState(DummyLinearContract.State(
|
||||
linearId = UniqueIdentifier(idString),
|
||||
participants = parties,
|
||||
linearString = linearString,
|
||||
linearNumber = linearNumber,
|
||||
linearBoolean = linearBoolean,
|
||||
linearTimestamp = clock.instant()), DUMMY_LINEAR_CONTRACT_PROGRAM_ID
|
||||
)
|
||||
addCommand(dummyCommand())
|
||||
}
|
||||
return notaryServices.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
private fun addLinearStates(statesToAdd: Int, parties: List<AbstractParty>) {
|
||||
cordaDB.transaction {
|
||||
(1..statesToAdd).map { createLinearStateTransaction("A".repeat(it), parties)}.forEach {
|
||||
storeTransaction(it)
|
||||
createVaultStatesFromTransaction(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun createCommodityTransaction(amount: Amount<Issued<Commodity>>, owner: AbstractParty): SignedTransaction {
|
||||
val txBuilder = TransactionBuilder(notary = dummyNotary.party)
|
||||
OnLedgerAsset.generateIssue(txBuilder, TransactionState(CommodityState(amount, owner), Obligation.PROGRAM_ID, dummyNotary.party), Obligation.Commands.Issue())
|
||||
return notaryServices.signInitialTransaction(txBuilder)
|
||||
}
|
||||
|
||||
private fun addCommodityStates(statesToAdd: Int, owner: AbstractParty) {
|
||||
cordaDB.transaction {
|
||||
(1..statesToAdd).map{
|
||||
createCommodityTransaction(Amount(it.toLong(), Issued(bankOfCorda.ref(2), Commodity.getInstance("FCOJ")!!)), owner)
|
||||
}.forEach {
|
||||
storeTransaction(it)
|
||||
createVaultStatesFromTransaction(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun <T> getState(clazz: Class<T>): T {
|
||||
return cordaDB.transaction {
|
||||
val criteriaBuilder = cordaDB.entityManagerFactory.criteriaBuilder
|
||||
val criteriaQuery = criteriaBuilder.createQuery(clazz)
|
||||
val queryRootStates = criteriaQuery.from(clazz)
|
||||
criteriaQuery.select(queryRootStates)
|
||||
val query = session.createQuery(criteriaQuery)
|
||||
query.singleResult
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkStatesEqual(expected: VaultSchemaV1.VaultStates, actual: VaultSchemaV1.VaultStates) {
|
||||
assertEquals(expected.notary, actual.notary)
|
||||
assertEquals(expected.stateStatus, actual.stateStatus)
|
||||
assertEquals(expected.relevancyStatus, actual.relevancyStatus)
|
||||
}
|
||||
|
||||
private fun addToStatePartyTable(stateAndRef: StateAndRef<ContractState>) {
|
||||
cordaDB.transaction {
|
||||
val persistentStateRef = PersistentStateRef(stateAndRef.ref.txhash.toString(), stateAndRef.ref.index)
|
||||
val session = currentDBSession()
|
||||
stateAndRef.state.data.participants.forEach {
|
||||
val persistentParty = VaultSchemaV1.PersistentParty(
|
||||
persistentStateRef,
|
||||
it
|
||||
)
|
||||
session.save(persistentParty)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Check a simple migration works`() {
|
||||
addCashStates(10, BOB)
|
||||
addCashStates(10, ALICE)
|
||||
assertEquals(20, getVaultStateCount())
|
||||
assertEquals(0, getStatePartyCount())
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
assertEquals(20, getVaultStateCount())
|
||||
assertEquals(20, getStatePartyCount())
|
||||
assertEquals(10, getVaultStateCount(Vault.RelevancyStatus.RELEVANT))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Check state paging works`() {
|
||||
addCashStates(1010, BOB)
|
||||
|
||||
assertEquals(0, getStatePartyCount())
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
assertEquals(1010, getStatePartyCount())
|
||||
assertEquals(1010, getVaultStateCount())
|
||||
assertEquals(0, getVaultStateCount(Vault.RelevancyStatus.NOT_RELEVANT))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Check state fields are correct`() {
|
||||
val tx = createCashTransaction(Cash(), 100.DOLLARS, ALICE)
|
||||
storeTransaction(tx)
|
||||
createVaultStatesFromTransaction(tx)
|
||||
val expectedPersistentParty = VaultSchemaV1.PersistentParty(
|
||||
PersistentStateRef(tx.id.toString(), 0),
|
||||
ALICE
|
||||
)
|
||||
val state = tx.coreTransaction.outputs.first()
|
||||
val constraintInfo = Vault.ConstraintInfo(state.constraint)
|
||||
val expectedPersistentState = VaultSchemaV1.VaultStates(
|
||||
notary = state.notary,
|
||||
contractStateClassName = state.data.javaClass.name,
|
||||
stateStatus = Vault.StateStatus.UNCONSUMED,
|
||||
recordedTime = clock.instant(),
|
||||
relevancyStatus = Vault.RelevancyStatus.NOT_RELEVANT,
|
||||
constraintType = constraintInfo.type(),
|
||||
constraintData = constraintInfo.data()
|
||||
)
|
||||
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
val persistentStateParty = getState(VaultSchemaV1.PersistentParty::class.java)
|
||||
val persistentState = getState(VaultSchemaV1.VaultStates::class.java)
|
||||
checkStatesEqual(expectedPersistentState, persistentState)
|
||||
assertEquals(expectedPersistentParty.x500Name, persistentStateParty.x500Name)
|
||||
assertEquals(expectedPersistentParty.compositeKey, persistentStateParty.compositeKey)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Check the connection is open post migration`() {
|
||||
// Liquibase automatically closes the database connection when doing an actual migration. This test ensures the custom migration
|
||||
// leaves it open.
|
||||
addCashStates(12, ALICE)
|
||||
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
assertFalse(cordaDB.dataSource.connection.isClosed)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `All parties added to state party table`() {
|
||||
val stx = createLinearStateTransaction("test", parties = listOf(ALICE, BOB, CHARLIE))
|
||||
storeTransaction(stx)
|
||||
createVaultStatesFromTransaction(stx)
|
||||
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
assertEquals(3, getStatePartyCount())
|
||||
assertEquals(1, getVaultStateCount())
|
||||
assertEquals(0, getVaultStateCount(Vault.RelevancyStatus.NOT_RELEVANT))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `State with corresponding transaction missing is skipped`() {
|
||||
val cash = Cash()
|
||||
val unknownTx = createCashTransaction(cash, 100.DOLLARS, BOB)
|
||||
createVaultStatesFromTransaction(unknownTx)
|
||||
|
||||
addCashStates(10, BOB)
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
assertEquals(10, getStatePartyCount())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `State with unknown ID is handled correctly`() {
|
||||
addCashStates(1, CHARLIE)
|
||||
addCashStates(10, BOB)
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
assertEquals(11, getStatePartyCount())
|
||||
assertEquals(1, getVaultStateCount(Vault.RelevancyStatus.NOT_RELEVANT))
|
||||
assertEquals(10, getVaultStateCount(Vault.RelevancyStatus.RELEVANT))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Null database causes migration to be ignored`() {
|
||||
val migration = VaultStateMigration()
|
||||
// Just check this does not throw an exception
|
||||
migration.execute(null)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `State with non-owning key for our name marked as relevant`() {
|
||||
val tx = createCashTransaction(Cash(), 100.DOLLARS, BOB2)
|
||||
storeTransaction(tx)
|
||||
createVaultStatesFromTransaction(tx)
|
||||
val state = tx.coreTransaction.outputs.first()
|
||||
val constraintInfo = Vault.ConstraintInfo(state.constraint)
|
||||
val expectedPersistentState = VaultSchemaV1.VaultStates(
|
||||
notary = state.notary,
|
||||
contractStateClassName = state.data.javaClass.name,
|
||||
stateStatus = Vault.StateStatus.UNCONSUMED,
|
||||
recordedTime = clock.instant(),
|
||||
relevancyStatus = Vault.RelevancyStatus.RELEVANT,
|
||||
constraintType = constraintInfo.type(),
|
||||
constraintData = constraintInfo.data()
|
||||
)
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
val persistentState = getState(VaultSchemaV1.VaultStates::class.java)
|
||||
checkStatesEqual(expectedPersistentState, persistentState)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `State already in state party table is excluded`() {
|
||||
val tx = createCashTransaction(Cash(), 100.DOLLARS, BOB)
|
||||
storeTransaction(tx)
|
||||
createVaultStatesFromTransaction(tx)
|
||||
addToStatePartyTable(tx.coreTransaction.outRef(0))
|
||||
addCashStates(5, BOB)
|
||||
assertEquals(1, getStatePartyCount())
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
assertEquals(6, getStatePartyCount())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Consumed states are not migrated`() {
|
||||
addCashStates(1010, BOB, Vault.StateStatus.CONSUMED)
|
||||
assertEquals(0, getStatePartyCount())
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
assertEquals(0, getStatePartyCount())
|
||||
}
|
||||
|
||||
// Used to test migration performance
|
||||
@Test
|
||||
@Ignore
|
||||
fun `Migrate large database`() {
|
||||
val statesAtOnce = 500L
|
||||
val stateMultiplier = 300L
|
||||
logger.info("Start adding states to vault")
|
||||
(1..stateMultiplier).forEach {
|
||||
addCashStates(statesAtOnce.toInt(), BOB)
|
||||
}
|
||||
logger.info("Finish adding states to vault")
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
assertEquals((statesAtOnce * stateMultiplier), getStatePartyCount())
|
||||
}
|
||||
|
||||
private fun makePersistentDataSourceProperties(): Properties {
|
||||
val props = Properties()
|
||||
props.setProperty("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource")
|
||||
props.setProperty("dataSource.url", "jdbc:h2:~/test/persistence;DB_CLOSE_ON_EXIT=TRUE")
|
||||
props.setProperty("dataSource.user", "sa")
|
||||
props.setProperty("dataSource.password", "")
|
||||
return props
|
||||
}
|
||||
|
||||
// Used to generate a persistent database for further testing.
|
||||
@Test
|
||||
@Ignore
|
||||
fun `Create persistent DB`() {
|
||||
val cashStatesToAdd = 1000
|
||||
val linearStatesToAdd = 0
|
||||
val commodityStatesToAdd = 0
|
||||
val stateMultiplier = 10
|
||||
|
||||
cordaDB = configureDatabase(makePersistentDataSourceProperties(), DatabaseConfig(), notaryServices.identityService::wellKnownPartyFromX500Name, notaryServices.identityService::wellKnownPartyFromAnonymous)
|
||||
|
||||
// Starting the database this way runs the migration under test. This is fine for the unit tests (as the changelog table is ignored),
|
||||
// but when starting an actual node using these databases the migration will be skipped, as it has an entry in the changelog table.
|
||||
// This must therefore be removed.
|
||||
cordaDB.dataSource.connection.createStatement().use {
|
||||
it.execute("DELETE FROM DATABASECHANGELOG WHERE FILENAME IN ('migration/vault-schema.changelog-v9.xml')")
|
||||
}
|
||||
|
||||
for (i in 1..stateMultiplier) {
|
||||
addCashStates(cashStatesToAdd, BOB)
|
||||
addLinearStates(linearStatesToAdd, listOf(BOB, ALICE))
|
||||
addCommodityStates(commodityStatesToAdd, BOB)
|
||||
}
|
||||
saveOurKeys(listOf(bob.keyPair))
|
||||
saveAllIdentities(listOf(BOB_IDENTITY, ALICE_IDENTITY, BOC_IDENTITY, dummyNotary.identity))
|
||||
cordaDB.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
fun `Run on persistent DB`() {
|
||||
cordaDB = configureDatabase(makePersistentDataSourceProperties(), DatabaseConfig(), notaryServices.identityService::wellKnownPartyFromX500Name, notaryServices.identityService::wellKnownPartyFromAnonymous)
|
||||
val connection = (liquibaseDB.connection as JdbcConnection)
|
||||
Mockito.`when`(connection.url).thenReturn(cordaDB.jdbcUrl)
|
||||
Mockito.`when`(connection.wrappedConnection).thenReturn(cordaDB.dataSource.connection)
|
||||
val migration = VaultStateMigration()
|
||||
migration.execute(liquibaseDB)
|
||||
cordaDB.close()
|
||||
}
|
||||
}
|
||||
|
@ -8,6 +8,8 @@ import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.TransactionIsolationLevel
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
@ -92,7 +94,7 @@ class DbMapDeadlockTest {
|
||||
val dbConfig = DatabaseConfig(initialiseSchema = true, transactionIsolationLevel = TransactionIsolationLevel.READ_COMMITTED)
|
||||
val schemaService = NodeSchemaService(extraSchemas = setOf(LockDbSchemaV2))
|
||||
createCordaPersistence(dbConfig, { null }, { null }, schemaService, hikariProperties, cacheFactory, null).apply {
|
||||
startHikariPool(hikariProperties, dbConfig, schemaService.schemaOptions.keys)
|
||||
startHikariPool(hikariProperties, dbConfig, schemaService.schemaOptions.keys, ourName = TestIdentity(ALICE_NAME, 70).name)
|
||||
}.use { persistence ->
|
||||
|
||||
// First clean up any remains from previous test runs
|
||||
|
@ -552,21 +552,20 @@ class NodeVaultServiceTest {
|
||||
|
||||
@Test
|
||||
fun `is ownable state relevant`() {
|
||||
val service = vaultService
|
||||
val amount = Amount(1000, Issued(BOC.ref(1), GBP))
|
||||
val wellKnownCash = Cash.State(amount, identity.party)
|
||||
val myKeys = services.keyManagementService.filterMyKeys(listOf(wellKnownCash.owner.owningKey))
|
||||
assertTrue { service.isRelevant(wellKnownCash, myKeys.toSet()) }
|
||||
assertTrue { NodeVaultService.isRelevant(wellKnownCash, myKeys.toSet()) }
|
||||
|
||||
val anonymousIdentity = services.keyManagementService.freshKeyAndCert(identity, false)
|
||||
val anonymousCash = Cash.State(amount, anonymousIdentity.party)
|
||||
val anonymousKeys = services.keyManagementService.filterMyKeys(listOf(anonymousCash.owner.owningKey))
|
||||
assertTrue { service.isRelevant(anonymousCash, anonymousKeys.toSet()) }
|
||||
assertTrue { NodeVaultService.isRelevant(anonymousCash, anonymousKeys.toSet()) }
|
||||
|
||||
val thirdPartyIdentity = AnonymousParty(generateKeyPair().public)
|
||||
val thirdPartyCash = Cash.State(amount, thirdPartyIdentity)
|
||||
val thirdPartyKeys = services.keyManagementService.filterMyKeys(listOf(thirdPartyCash.owner.owningKey))
|
||||
assertFalse { service.isRelevant(thirdPartyCash, thirdPartyKeys.toSet()) }
|
||||
assertFalse { NodeVaultService.isRelevant(thirdPartyCash, thirdPartyKeys.toSet()) }
|
||||
}
|
||||
|
||||
// TODO: Unit test linear state relevancy checks
|
||||
|
@ -35,7 +35,9 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.registerDevP2pCertificates
|
||||
import net.corda.serialization.internal.amqp.AMQP_ENABLED
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.nio.file.Files
|
||||
@ -176,9 +178,10 @@ fun configureDatabase(hikariProperties: Properties,
|
||||
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
|
||||
schemaService: SchemaService = NodeSchemaService(),
|
||||
internalSchemas: Set<MappedSchema> = NodeSchemaService().internalSchemas(),
|
||||
cacheFactory: NamedCacheFactory = TestingNamedCacheFactory()): CordaPersistence {
|
||||
cacheFactory: NamedCacheFactory = TestingNamedCacheFactory(),
|
||||
ourName: CordaX500Name = TestIdentity(ALICE_NAME, 70).name): CordaPersistence {
|
||||
val persistence = createCordaPersistence(databaseConfig, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous, schemaService, hikariProperties, cacheFactory, null)
|
||||
persistence.startHikariPool(hikariProperties, databaseConfig, internalSchemas)
|
||||
persistence.startHikariPool(hikariProperties, databaseConfig, internalSchemas, ourName = ourName)
|
||||
return persistence
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user