mirror of
https://github.com/corda/corda.git
synced 2025-01-18 10:46:38 +00:00
[ENT-4024] Use string for the status column in the transaction table (#5379)
This commit is contained in:
parent
57d0d8c10c
commit
d1b9e95cd5
@ -70,16 +70,16 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
||||
for (downloaded in downloadedTxs) {
|
||||
suspended = false
|
||||
val dependencies = downloaded.dependencies
|
||||
val downloadedTxs = this.txsInCheckpoint
|
||||
if (downloadedTxs != null) {
|
||||
if (downloadedTxs.size < MAX_CHECKPOINT_RESOLUTION) {
|
||||
downloadedTxs[downloaded.id] = downloaded
|
||||
val checkpointedTxs = this.txsInCheckpoint
|
||||
if (checkpointedTxs != null) {
|
||||
if (checkpointedTxs.size < MAX_CHECKPOINT_RESOLUTION) {
|
||||
checkpointedTxs[downloaded.id] = downloaded
|
||||
} else {
|
||||
logger.debug {
|
||||
"Resolving transaction dependencies has reached a checkpoint limit of $MAX_CHECKPOINT_RESOLUTION " +
|
||||
"transactions. Switching to the node database for storing the unverified transactions."
|
||||
}
|
||||
downloadedTxs.values.forEach(transactionStorage::addUnverifiedTransaction)
|
||||
checkpointedTxs.values.forEach(transactionStorage::addUnverifiedTransaction)
|
||||
// This acts as both a flag that we've switched over to storing the backchain into the db, and to remove what's been
|
||||
// built up in the checkpoint
|
||||
this.txsInCheckpoint = null
|
||||
@ -92,8 +92,9 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
||||
// The write locks are only released over a suspend, so need to keep track of whether the flow has been suspended to ensure
|
||||
// that locks are not held beyond each while loop iteration (as doing this would result in a deadlock due to claiming locks
|
||||
// in the wrong order)
|
||||
suspended = suspended || flow.fetchMissingAttachments(downloaded)
|
||||
suspended = suspended || flow.fetchMissingNetworkParameters(downloaded)
|
||||
val suspendedViaAttachments = flow.fetchMissingAttachments(downloaded)
|
||||
val suspendedViaParams = flow.fetchMissingNetworkParameters(downloaded)
|
||||
suspended = suspended || suspendedViaAttachments || suspendedViaParams
|
||||
|
||||
// Add all input states and reference input states to the work queue.
|
||||
nextRequests.addAll(dependencies)
|
||||
|
@ -44,18 +44,19 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
@Column(name = "transaction_value", nullable = false)
|
||||
val transaction: ByteArray,
|
||||
|
||||
@Column(name = "status", nullable = false)
|
||||
val status: Char
|
||||
@Column(name = "status", nullable = false, length = 1)
|
||||
@Convert(converter = TransactionStatusConverter::class)
|
||||
val status: TransactionStatus
|
||||
)
|
||||
|
||||
private enum class TransactionStatus {
|
||||
enum class TransactionStatus {
|
||||
UNVERIFIED,
|
||||
VERIFIED;
|
||||
|
||||
fun toDatabaseChar(): Char {
|
||||
fun toDatabaseValue(): String {
|
||||
return when (this) {
|
||||
UNVERIFIED -> 'U'
|
||||
VERIFIED -> 'V'
|
||||
UNVERIFIED -> "U"
|
||||
VERIFIED -> "V"
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,16 +65,27 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
}
|
||||
|
||||
companion object {
|
||||
fun fromDatabaseChar(databaseValue: Char): TransactionStatus {
|
||||
fun fromDatabaseValue(databaseValue: String): TransactionStatus {
|
||||
return when(databaseValue) {
|
||||
'V' -> VERIFIED
|
||||
'U' -> UNVERIFIED
|
||||
"V" -> VERIFIED
|
||||
"U" -> UNVERIFIED
|
||||
else -> throw UnexpectedStatusValueException(databaseValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class UnexpectedStatusValueException(status: Char): Exception("Found unexpected status value $status in transaction store")
|
||||
private class UnexpectedStatusValueException(status: String): Exception("Found unexpected status value $status in transaction store")
|
||||
}
|
||||
|
||||
@Converter
|
||||
class TransactionStatusConverter : AttributeConverter<TransactionStatus, String> {
|
||||
override fun convertToDatabaseColumn(attribute: TransactionStatus): String {
|
||||
return attribute.toDatabaseValue()
|
||||
}
|
||||
|
||||
override fun convertToEntityAttribute(dbData: String): TransactionStatus {
|
||||
return TransactionStatus.fromDatabaseValue(dbData)
|
||||
}
|
||||
}
|
||||
|
||||
private companion object {
|
||||
@ -101,14 +113,14 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
fromPersistentEntity = {
|
||||
SecureHash.parse(it.txId) to TxCacheValue(
|
||||
it.transaction.deserialize(context = contextToUse()),
|
||||
TransactionStatus.fromDatabaseChar(it.status))
|
||||
it.status)
|
||||
},
|
||||
toPersistentEntity = { key: SecureHash, value: TxCacheValue ->
|
||||
DBTransaction(
|
||||
txId = key.toString(),
|
||||
stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id?.uuid?.toString(),
|
||||
transaction = value.toSignedTx().serialize(context = contextToUse().withEncoding(SNAPPY)).bytes,
|
||||
status = value.status.toDatabaseChar()
|
||||
status = value.status
|
||||
)
|
||||
},
|
||||
persistentEntityClass = DBTransaction::class.java,
|
||||
@ -129,10 +141,10 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
val criteriaBuilder = session.criteriaBuilder
|
||||
val criteriaUpdate = criteriaBuilder.createCriteriaUpdate(DBTransaction::class.java)
|
||||
val updateRoot = criteriaUpdate.from(DBTransaction::class.java)
|
||||
criteriaUpdate.set(DBTransaction::status.name, TransactionStatus.VERIFIED.toDatabaseChar())
|
||||
criteriaUpdate.set(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.VERIFIED)
|
||||
criteriaUpdate.where(criteriaBuilder.and(
|
||||
criteriaBuilder.equal(updateRoot.get<String>(DBTransaction::txId.name), txId.toString()),
|
||||
criteriaBuilder.equal(updateRoot.get<Boolean>(DBTransaction::status.name), TransactionStatus.UNVERIFIED.toDatabaseChar())
|
||||
criteriaBuilder.equal(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.UNVERIFIED)
|
||||
))
|
||||
val update = session.createQuery(criteriaUpdate)
|
||||
val rowsUpdated = update.executeUpdate()
|
||||
|
@ -4,9 +4,9 @@
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
|
||||
logicalFilePath="migration/node-services.changelog-init.xml">
|
||||
|
||||
<changeSet author="R3.Corda" id="add_is_verified_column">
|
||||
<changeSet author="R3.Corda" id="add_status_column">
|
||||
<addColumn tableName="node_transactions">
|
||||
<column name="status" type="NCHAR(255)" defaultValue="V">
|
||||
<column name="status" type="NVARCHAR(1)" defaultValue="V">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
</addColumn>
|
||||
|
@ -212,7 +212,7 @@ class VaultStateMigrationTest {
|
||||
txId = tx.id.toString(),
|
||||
stateMachineRunId = null,
|
||||
transaction = tx.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes,
|
||||
status = 'V'
|
||||
status = DBTransactionStorage.TransactionStatus.VERIFIED
|
||||
)
|
||||
session.save(persistentTx)
|
||||
}
|
||||
|
@ -284,7 +284,7 @@ class RetryFlowMockTest {
|
||||
}
|
||||
|
||||
private fun doInsert() {
|
||||
val tx = DBTransactionStorage.DBTransaction("Foo", null, Utils.EMPTY_BYTES, 'V')
|
||||
val tx = DBTransactionStorage.DBTransaction("Foo", null, Utils.EMPTY_BYTES, DBTransactionStorage.TransactionStatus.VERIFIED)
|
||||
contextTransaction.session.save(tx)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user