[CORDA-2561] Use the attachments classloader to deserialise contract states in migrations (#4754)

* Use the attachments classloader to deserialize contract states in migrations

* Added some comments to explain serialisation behaviour and how tests work.

* Add debug log to indicate when attachment classloading has failed.

* Use a servicesForResolution to load states for compatibility with notary changes and contract upgrades

* Add test case to cover notary change transactions

* Address review comments

* Change logging message in MigrationServicesForResolution

* Read the network-parameters file if there is nothing in the database

* Update documentation and provide a warning if there are many states.
This commit is contained in:
JamesHR3 2019-02-17 08:24:02 +00:00 committed by Tommy Lillehagen
parent e5c7355d43
commit 9353e4dd93
7 changed files with 279 additions and 24 deletions

View File

@ -1,7 +1,5 @@
package net.corda.core.serialization.internal
import net.corda.core.CordaException
import net.corda.core.KeepForDJVM
import net.corda.core.contracts.Attachment
import net.corda.core.contracts.ContractAttachment
import net.corda.core.contracts.TransactionVerificationException
@ -292,7 +290,7 @@ class AttachmentsClassLoader(attachments: List<Attachment>,
* whitelisted classes.
*/
@VisibleForTesting
internal object AttachmentsClassLoaderBuilder {
object AttachmentsClassLoaderBuilder {
private const val CACHE_SIZE = 1000
// We use a set here because the ordering of attachments doesn't affect code execution, due to the no

View File

@ -269,6 +269,8 @@ We have open sourced the Liquibase schema upgrade feature from Corda Enterprise.
bootstrap and update itself automatically. This is a transparent change with pre Corda 4 nodes seamlessly
upgrading to operate as if they'd been bootstrapped in this way. This also applies to the finance CorDapp module.
.. important:: If you're upgrading a node from Corda 3 to Corda 4 and there is old data in the vault, this upgrade may take some time, depending on the number of unconsumed states in the vault.
Ability to pre-validate configuration files
+++++++++++++++++++++++++++++++++++++++++++

View File

@ -8,11 +8,8 @@ import liquibase.exception.ValidationErrors
import liquibase.resource.ResourceAccessor
import net.corda.core.identity.CordaX500Name
import net.corda.core.schemas.MappedSchema
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.PublicKeyToTextConverter
import net.corda.node.services.persistence.*
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.SchemaMigration.Companion.NODE_X500_NAME
@ -40,10 +37,10 @@ abstract class CordaMigration : CustomTaskChange {
private lateinit var _cordaDB: CordaPersistence
val dbTransactions: WritableTransactionStorage
get() = _dbTransactions
val servicesForResolution: MigrationServicesForResolution
get() = _servicesForResolution
private lateinit var _dbTransactions: WritableTransactionStorage
private lateinit var _servicesForResolution: MigrationServicesForResolution
/**
* Initialise a subset of node services so that data from these can be used to perform migrations.
@ -65,7 +62,9 @@ abstract class CordaMigration : CustomTaskChange {
cordaDB.transaction {
identityService.ourNames = setOf(ourName)
_dbTransactions = DBTransactionStorage(cordaDB, cacheFactory)
val dbTransactions = DBTransactionStorage(cordaDB, cacheFactory)
val attachmentsService = NodeAttachmentService(metricRegistry, cacheFactory, cordaDB)
_servicesForResolution = MigrationServicesForResolution(identityService, attachmentsService, dbTransactions, cordaDB, cacheFactory)
}
}
@ -143,3 +142,5 @@ class MigrationDataSource(val database: Database) : DataSource {
throw SQLFeatureNotSupportedException()
}
}
class MigrationException(msg: String?, cause: Exception? = null): Exception(msg, cause)

View File

@ -30,6 +30,10 @@ class MigrationNamedCacheFactory(private val metricRegistry: MetricRegistry?,
"PersistentIdentityService_partyByKey" -> caffeine.maximumSize(defaultCacheSize)
"PersistentIdentityService_partyByName" -> caffeine.maximumSize(defaultCacheSize)
"BasicHSMKeyManagementService_keys" -> caffeine.maximumSize(defaultCacheSize)
"NodeAttachmentService_attachmentContent" -> caffeine.maximumWeight(defaultCacheSize)
"NodeAttachmentService_attachmentPresence" -> caffeine.maximumSize(defaultCacheSize)
"NodeAttachmentService_contractAttachmentVersions" -> caffeine.maximumSize(defaultCacheSize)
"NodeParametersStorage_networkParametersByHash" -> caffeine.maximumSize(defaultCacheSize)
else -> throw IllegalArgumentException("Unexpected cache name $name.")
}
}

View File

@ -0,0 +1,142 @@
package net.corda.node.migration
import net.corda.core.contracts.*
import net.corda.core.cordapp.CordappProvider
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.deserialiseComponentGroup
import net.corda.core.internal.div
import net.corda.core.internal.readObject
import net.corda.core.node.NetworkParameters
import net.corda.core.node.ServicesForResolution
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.NetworkParametersService
import net.corda.core.node.services.TransactionStorage
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.AttachmentsClassLoaderBuilder
import net.corda.core.transactions.ContractUpgradeLedgerTransaction
import net.corda.core.transactions.NotaryChangeLedgerTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.contextLogger
import net.corda.node.internal.DBNetworkParametersStorage
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.SchemaMigration
import java.nio.file.Paths
import java.time.Clock
import java.time.Duration
class MigrationServicesForResolution(
override val identityService: IdentityService,
override val attachments: AttachmentStorage,
private val transactions: TransactionStorage,
private val cordaDB: CordaPersistence,
cacheFactory: MigrationNamedCacheFactory
): ServicesForResolution {
companion object {
val logger = contextLogger()
}
override val cordappProvider: CordappProvider
get() = throw NotImplementedError()
private fun defaultNetworkParameters(): NetworkParameters {
logger.warn("Using a dummy set of network parameters for migration.")
val clock = Clock.systemUTC()
return NetworkParameters(
1,
listOf(),
1,
1,
clock.instant(),
1,
mapOf(),
Duration.ZERO,
mapOf()
)
}
private fun getNetworkParametersFromFile(): SignedNetworkParameters? {
return try {
val dir = System.getProperty(SchemaMigration.NODE_BASE_DIR_KEY)
val path = Paths.get(dir) / NETWORK_PARAMS_FILE_NAME
path.readObject()
} catch (e: Exception) {
logger.info("Couldn't find network parameters file: ${e.message}. This is expected if the node is starting for the first time.")
null
}
}
override val networkParametersService: NetworkParametersService = object : NetworkParametersService {
private val storage = DBNetworkParametersStorage.createParametersMap(cacheFactory)
private val filedParams = getNetworkParametersFromFile()
override val defaultHash: SecureHash = filedParams?.raw?.hash ?: SecureHash.getZeroHash()
override val currentHash: SecureHash = cordaDB.transaction {
storage.allPersisted().maxBy { it.second.verified().epoch }?.first ?: defaultHash
}
override fun lookup(hash: SecureHash): NetworkParameters? {
// Note that the parameters in any file shouldn't be put into the database - this will be done by the node on startup.
return if (hash == filedParams?.raw?.hash) {
filedParams.raw.deserialize()
} else {
cordaDB.transaction { storage[hash]?.verified() }
}
}
}
override val networkParameters: NetworkParameters = networkParametersService.lookup(networkParametersService.currentHash)
?: getNetworkParametersFromFile()?.raw?.deserialize()
?: defaultNetworkParameters()
private fun extractStateFromTx(tx: WireTransaction, stateIndices: Collection<Int>): List<TransactionState<ContractState>> {
return try {
val attachments = tx.attachments.mapNotNull { attachments.openAttachment(it)}
val states = AttachmentsClassLoaderBuilder.withAttachmentsClassloaderContext(attachments, networkParameters, tx.id) {
deserialiseComponentGroup(tx.componentGroups, TransactionState::class, ComponentGroupEnum.OUTPUTS_GROUP, forceDeserialize = true)
}
states.filterIndexed {index, _ -> stateIndices.contains(index)}.toList()
} catch (e: Exception) {
// If there is no attachment that allows the state class to be deserialised correctly, then carpent a state class anyway. It
// might still be possible to access the participants depending on how the state class was serialised.
logger.debug("Could not use attachments to deserialise transaction output states for transaction ${tx.id}")
tx.outputs.filterIndexed { index, _ -> stateIndices.contains(index)}
}
}
override fun loadState(stateRef: StateRef): TransactionState<*> {
val stx = transactions.getTransaction(stateRef.txhash)
?: throw MigrationException("Could not get transaction with hash ${stateRef.txhash} out of vault")
val baseTx = stx.resolveBaseTransaction(this)
return when (baseTx) {
is NotaryChangeLedgerTransaction -> baseTx.outputs[stateRef.index]
is ContractUpgradeLedgerTransaction -> baseTx.outputs[stateRef.index]
is WireTransaction -> extractStateFromTx(baseTx, listOf(stateRef.index)).first()
else -> throw MigrationException("Unknown transaction type ${baseTx::class.qualifiedName} found when loading a state")
}
}
override fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>> {
return stateRefs.groupBy { it.txhash }.flatMap {
val stx = transactions.getTransaction(it.key)
?: throw MigrationException("Could not get transaction with hash ${it.key} out of vault")
val baseTx = stx.resolveBaseTransaction(this)
val stateList = when (baseTx) {
is NotaryChangeLedgerTransaction -> it.value.map { stateRef -> StateAndRef(baseTx.outputs[stateRef.index], stateRef) }
is ContractUpgradeLedgerTransaction -> it.value.map { stateRef -> StateAndRef(baseTx.outputs[stateRef.index], stateRef) }
is WireTransaction -> extractStateFromTx(baseTx, it.value.map { stateRef -> stateRef.index })
.mapIndexed {index, state -> StateAndRef(state, StateRef(baseTx.id, index)) }
else -> throw MigrationException("Unknown transaction type ${baseTx::class.qualifiedName} found when loading a state")
}
stateList
}.toSet()
}
override fun loadContractAttachment(stateRef: StateRef): Attachment {
throw NotImplementedError()
}
}

View File

@ -1,9 +1,7 @@
package net.corda.node.migration
import liquibase.database.Database
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.Vault
import net.corda.core.schemas.MappedSchema
@ -11,9 +9,11 @@ import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.internal.*
import net.corda.core.utilities.contextLogger
import net.corda.node.internal.DBNetworkParametersStorage
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.BasicHSMKeyManagementService
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSchemaV1
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -47,6 +47,9 @@ class VaultStateMigration : CordaMigration() {
session.persist(persistentParty)
}
} catch (e: AbstractMethodError) {
// This should only happen if there was no attachment that could be used to deserialise the output states, and the state was
// serialised such that the participants list cannot be accessed (participants is calculated and not marked as a
// SerializableCalculatedProperty.
throw VaultStateMigrationException("Cannot add state parties as state class is not on the classpath " +
"and participants cannot be synthesised")
}
@ -56,10 +59,12 @@ class VaultStateMigration : CordaMigration() {
val persistentStateRef = persistentState.stateRef ?:
throw VaultStateMigrationException("Persistent state ref missing from state")
val txHash = SecureHash.parse(persistentStateRef.txId)
val tx = dbTransactions.getTransaction(txHash) ?:
throw VaultStateMigrationException("Transaction $txHash not present in vault")
val state = tx.coreTransaction.outputs[persistentStateRef.index]
val stateRef = StateRef(txHash, persistentStateRef.index)
val state = try {
servicesForResolution.loadState(stateRef)
} catch (e: Exception) {
throw VaultStateMigrationException("Could not load state for stateRef $stateRef : ${e.message}", e)
}
return StateAndRef(state, stateRef)
}
@ -70,8 +75,12 @@ class VaultStateMigration : CordaMigration() {
return
}
initialiseNodeServices(database, setOf(VaultMigrationSchemaV1, VaultSchemaV1))
var statesSkipped = 0
val persistentStates = VaultStateIterator(cordaDB)
if (persistentStates.numStates > 0) {
logger.warn("Found ${persistentStates.numStates} states to update from a previous version. This may take a while for large "
+ "volumes of data.")
}
VaultStateIterator.withSerializationEnv {
persistentStates.forEach {
val session = currentDBSession()
@ -89,11 +98,15 @@ class VaultStateMigration : CordaMigration() {
it.relevancyStatus = Vault.RelevancyStatus.NOT_RELEVANT
}
} catch (e: VaultStateMigrationException) {
logger.warn("An error occurred while migrating a vault state: ${e.message}. Skipping")
logger.warn("An error occurred while migrating a vault state: ${e.message}. Skipping", e)
statesSkipped++
}
}
}
logger.info("Finished performing vault state data migration for ${persistentStates.numStates} states")
if (statesSkipped > 0) {
logger.error("$statesSkipped states could not be migrated as there was no class available for them.")
}
logger.info("Finished performing vault state data migration for ${persistentStates.numStates - statesSkipped} states")
}
}
@ -112,7 +125,9 @@ object VaultMigrationSchemaV1 : MappedSchema(schemaFamily = VaultMigrationSchema
DBTransactionStorage.DBTransaction::class.java,
PersistentIdentityService.PersistentIdentity::class.java,
PersistentIdentityService.PersistentIdentityNames::class.java,
BasicHSMKeyManagementService.PersistentKey::class.java
BasicHSMKeyManagementService.PersistentKey::class.java,
NodeAttachmentService.DBAttachment::class.java,
DBNetworkParametersStorage.PersistentNetworkParameters::class.java
)
)
@ -309,4 +324,4 @@ class VaultStateIterator(private val database: CordaPersistence) : Iterator<Vaul
}
}
class VaultStateMigrationException(msg: String) : Exception(msg)
class VaultStateMigrationException(msg: String, cause: Exception? = null) : Exception(msg, cause)

View File

@ -3,11 +3,16 @@ package net.corda.node.migration
import liquibase.database.Database
import liquibase.database.jvm.JdbcConnection
import net.corda.core.contracts.*
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.hash
import net.corda.core.internal.packageName
import net.corda.core.internal.*
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.Vault
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SerializationDefaults
@ -21,12 +26,15 @@ import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.asset.Obligation
import net.corda.finance.contracts.asset.OnLedgerAsset
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.DBNetworkParametersStorage
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.BasicHSMKeyManagementService
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.vault.VaultSchemaV1
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.testing.core.*
import net.corda.testing.internal.configureDatabase
@ -42,10 +50,21 @@ import org.junit.*
import org.mockito.Mockito
import java.security.KeyPair
import java.time.Clock
import java.time.Duration
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFalse
/**
* These tests aim to verify that migrating vault states from V3 to later versions works correctly. While these unit tests verify the
* migrating behaviour is correct (tables populated, columns updated for the right states), it comes with a caveat: they do not test that
* deserialising states with the attachment classloader works correctly.
*
* The reason for this is that it is impossible to do so. There is no real way of writing a unit or integration test to upgrade from one
* version to another (at the time of writing). These tests simulate a small part of the upgrade process by directly using hibernate to
* populate a database as a V3 node would, then running the migration class. However, it is impossible to do this for attachments as there
* is no contract state jar to serialise.
*/
class VaultStateMigrationTest {
companion object {
val alice = TestIdentity(ALICE_NAME, 70)
@ -102,13 +121,42 @@ class VaultStateMigrationTest {
saveOurKeys(listOf(bob.keyPair, bob2.keyPair))
saveAllIdentities(listOf(BOB_IDENTITY, ALICE_IDENTITY, BOC_IDENTITY, dummyNotary.identity, BOB2_IDENTITY))
addNetworkParameters()
}
@After
fun close() {
contextTransactionOrNull?.close()
cordaDB.close()
}
private fun addNetworkParameters() {
cordaDB.transaction {
val clock = Clock.systemUTC()
val params = NetworkParameters(
1,
listOf(NotaryInfo(DUMMY_NOTARY, false), NotaryInfo(CHARLIE, false)),
1,
1,
clock.instant(),
1,
mapOf(),
Duration.ZERO,
mapOf()
)
val signedParams = params.signWithCert(bob.keyPair.private, BOB_IDENTITY.certificate)
val persistentParams = DBNetworkParametersStorage.PersistentNetworkParameters(
SecureHash.allOnesHash.toString(),
params.epoch,
signedParams.raw.bytes,
signedParams.sig.bytes,
signedParams.sig.by.encoded,
X509Utilities.buildCertPath(signedParams.sig.parentCertsChain).encoded
)
session.save(persistentParams)
}
}
private fun createCashTransaction(cash: Cash, value: Amount<Currency>, owner: AbstractParty): SignedTransaction {
val tx = TransactionBuilder(DUMMY_NOTARY)
cash.generateIssue(tx, Amount(value.quantity, Issued(bankOfCorda.ref(1), value.token)), owner, DUMMY_NOTARY)
@ -245,6 +293,33 @@ class VaultStateMigrationTest {
}
}
private fun createNotaryChangeTransaction(inputs: List<StateRef>, paramsHash: SecureHash): SignedTransaction {
val notaryTx = NotaryChangeTransactionBuilder(inputs, DUMMY_NOTARY, CHARLIE, paramsHash).build()
val notaryKey = DUMMY_NOTARY.owningKey
val signableData = SignableData(notaryTx.id, SignatureMetadata(3, Crypto.findSignatureScheme(notaryKey).schemeNumberID))
val notarySignature = notaryServices.keyManagementService.sign(signableData, notaryKey)
return SignedTransaction(notaryTx, listOf(notarySignature))
}
private fun createVaultStatesFromNotaryChangeTransaction(tx: SignedTransaction, inputs: List<TransactionState<ContractState>>) {
cordaDB.transaction {
inputs.forEachIndexed { index, state ->
val constraintInfo = Vault.ConstraintInfo(state.constraint)
val persistentState = VaultSchemaV1.VaultStates(
notary = tx.notary!!,
contractStateClassName = state.data.javaClass.name,
stateStatus = Vault.StateStatus.UNCONSUMED,
recordedTime = clock.instant(),
relevancyStatus = Vault.RelevancyStatus.RELEVANT, //Always persist as relevant to mimic V3
constraintType = constraintInfo.type(),
constraintData = constraintInfo.data()
)
persistentState.stateRef = PersistentStateRef(tx.id.toString(), index)
session.save(persistentState)
}
}
}
private fun <T> getState(clazz: Class<T>): T {
return cordaDB.transaction {
val criteriaBuilder = cordaDB.entityManagerFactory.criteriaBuilder
@ -429,6 +504,24 @@ class VaultStateMigrationTest {
assertEquals(0, getStatePartyCount())
}
@Test
fun `State created with notary change transaction can be migrated`() {
// This test is a little bit of a hack - it checks that these states are migrated correctly by looking at params in the database,
// but these will not be there for V3 nodes. Handling for this must be tested manually.
val cashTx = createCashTransaction(Cash(), 5.DOLLARS, BOB)
val cashTx2 = createCashTransaction(Cash(), 10.DOLLARS, BOB)
val notaryTx = createNotaryChangeTransaction(listOf(StateRef(cashTx.id, 0), StateRef(cashTx2.id, 0)), SecureHash.allOnesHash)
createVaultStatesFromTransaction(cashTx, stateStatus = Vault.StateStatus.CONSUMED)
createVaultStatesFromTransaction(cashTx2, stateStatus = Vault.StateStatus.CONSUMED)
createVaultStatesFromNotaryChangeTransaction(notaryTx, cashTx.coreTransaction.outputs + cashTx2.coreTransaction.outputs)
storeTransaction(cashTx)
storeTransaction(cashTx2)
storeTransaction(notaryTx)
val migration = VaultStateMigration()
migration.execute(liquibaseDB)
assertEquals(2, getStatePartyCount())
}
// Used to test migration performance
@Test
@Ignore