mirror of
https://github.com/corda/corda.git
synced 2025-06-21 00:23:09 +00:00
Merge pull request #1351 from corda/parkri-os-merge-20180822-1
OS -> ENT merge of transaction mapping schema change
This commit is contained in:
@ -17,17 +17,13 @@ import net.corda.core.internal.bufferUntilSubscribed
|
|||||||
import net.corda.core.messaging.DataFeed
|
import net.corda.core.messaging.DataFeed
|
||||||
import net.corda.core.messaging.StateMachineTransactionMapping
|
import net.corda.core.messaging.StateMachineTransactionMapping
|
||||||
import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage
|
import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage
|
||||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
|
||||||
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
||||||
|
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||||
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
import javax.persistence.Column
|
|
||||||
import javax.persistence.Entity
|
|
||||||
import javax.persistence.Id
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Database storage of a txhash -> state machine id mapping.
|
* Database storage of a txhash -> state machine id mapping.
|
||||||
@ -37,36 +33,7 @@ import javax.persistence.Id
|
|||||||
*/
|
*/
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
class DBTransactionMappingStorage(private val database: CordaPersistence) : StateMachineRecordedTransactionMappingStorage {
|
class DBTransactionMappingStorage(private val database: CordaPersistence) : StateMachineRecordedTransactionMappingStorage {
|
||||||
|
|
||||||
@Entity
|
|
||||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}transaction_mappings")
|
|
||||||
class DBTransactionMapping(
|
|
||||||
@Id
|
|
||||||
@Column(name = "tx_id", length = 64, nullable = false)
|
|
||||||
var txId: String = "",
|
|
||||||
|
|
||||||
@Column(name = "state_machine_run_id", length = 36, nullable = true)
|
|
||||||
var stateMachineRunId: String? = ""
|
|
||||||
)
|
|
||||||
|
|
||||||
private companion object {
|
|
||||||
fun createMap(): AppendOnlyPersistentMap<SecureHash, StateMachineRunId, DBTransactionMapping, String> {
|
|
||||||
return AppendOnlyPersistentMap(
|
|
||||||
toPersistentEntityKey = { it.toString() },
|
|
||||||
fromPersistentEntity = { Pair(SecureHash.parse(it.txId), StateMachineRunId(UUID.fromString(it.stateMachineRunId))) },
|
|
||||||
toPersistentEntity = { key: SecureHash, (uuid) ->
|
|
||||||
DBTransactionMapping().apply {
|
|
||||||
txId = key.toString()
|
|
||||||
stateMachineRunId = uuid.toString()
|
|
||||||
}
|
|
||||||
},
|
|
||||||
persistentEntityClass = DBTransactionMapping::class.java
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class InnerState {
|
private class InnerState {
|
||||||
val stateMachineTransactionMap = createMap()
|
|
||||||
val updates: PublishSubject<StateMachineTransactionMapping> = PublishSubject.create()
|
val updates: PublishSubject<StateMachineTransactionMapping> = PublishSubject.create()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,7 +42,6 @@ class DBTransactionMappingStorage(private val database: CordaPersistence) : Stat
|
|||||||
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
|
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
|
||||||
database.transaction {
|
database.transaction {
|
||||||
concurrentBox.concurrent {
|
concurrentBox.concurrent {
|
||||||
stateMachineTransactionMap.addWithDuplicatesAllowed(transactionId, stateMachineRunId)
|
|
||||||
updates.bufferUntilDatabaseCommit().onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
|
updates.bufferUntilDatabaseCommit().onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -84,8 +50,14 @@ class DBTransactionMappingStorage(private val database: CordaPersistence) : Stat
|
|||||||
override fun track(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
|
override fun track(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
|
||||||
return database.transaction {
|
return database.transaction {
|
||||||
concurrentBox.exclusive {
|
concurrentBox.exclusive {
|
||||||
DataFeed(stateMachineTransactionMap.allPersisted().map { StateMachineTransactionMapping(it.second, it.first) }.toList(),
|
val session = currentDBSession()
|
||||||
updates.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
val cb = session.criteriaBuilder
|
||||||
|
val cq = cb.createTupleQuery()
|
||||||
|
val from = cq.from(DBTransactionStorage.DBTransaction::class.java)
|
||||||
|
cq.multiselect(from.get<String>(DBTransactionStorage.DBTransaction::stateMachineRunId.name), from.get<String>(DBTransactionStorage.DBTransaction::txId.name))
|
||||||
|
cq.where(cb.isNotNull(from.get<String>(DBTransactionStorage.DBTransaction::stateMachineRunId.name)))
|
||||||
|
val flowIds = session.createQuery(cq).resultList.map { StateMachineTransactionMapping(StateMachineRunId(UUID.fromString(it[0] as String)), SecureHash.parse(it[1] as String)) }
|
||||||
|
DataFeed(flowIds, updates.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ import net.corda.core.toFuture
|
|||||||
import net.corda.core.transactions.CoreTransaction
|
import net.corda.core.transactions.CoreTransaction
|
||||||
import net.corda.core.transactions.SignedTransaction
|
import net.corda.core.transactions.SignedTransaction
|
||||||
import net.corda.node.services.api.WritableTransactionStorage
|
import net.corda.node.services.api.WritableTransactionStorage
|
||||||
|
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||||
import net.corda.node.utilities.AppendOnlyPersistentMapBase
|
import net.corda.node.utilities.AppendOnlyPersistentMapBase
|
||||||
import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap
|
import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
@ -50,6 +51,9 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers
|
|||||||
@Column(name = "tx_id", length = 64, nullable = false)
|
@Column(name = "tx_id", length = 64, nullable = false)
|
||||||
var txId: String = "",
|
var txId: String = "",
|
||||||
|
|
||||||
|
@Column(name = "state_machine_run_id", length = 36, nullable = true)
|
||||||
|
var stateMachineRunId: String? = "",
|
||||||
|
|
||||||
@Lob
|
@Lob
|
||||||
@Column(name = "transaction_value", nullable = false)
|
@Column(name = "transaction_value", nullable = false)
|
||||||
var transaction: ByteArray = EMPTY_BYTE_ARRAY
|
var transaction: ByteArray = EMPTY_BYTE_ARRAY
|
||||||
@ -68,6 +72,7 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers
|
|||||||
toPersistentEntity = { key: SecureHash, value: TxCacheValue ->
|
toPersistentEntity = { key: SecureHash, value: TxCacheValue ->
|
||||||
DBTransaction().apply {
|
DBTransaction().apply {
|
||||||
txId = key.toString()
|
txId = key.toString()
|
||||||
|
stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id?.uuid?.toString()
|
||||||
transaction = value.toSignedTx().
|
transaction = value.toSignedTx().
|
||||||
serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(SNAPPY)).bytes
|
serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(SNAPPY)).bytes
|
||||||
}
|
}
|
||||||
|
@ -13,10 +13,6 @@ package net.corda.node.services.schema
|
|||||||
import net.corda.core.contracts.ContractState
|
import net.corda.core.contracts.ContractState
|
||||||
import net.corda.core.contracts.FungibleAsset
|
import net.corda.core.contracts.FungibleAsset
|
||||||
import net.corda.core.contracts.LinearState
|
import net.corda.core.contracts.LinearState
|
||||||
import net.corda.core.schemas.CommonSchemaV1
|
|
||||||
import net.corda.core.schemas.MappedSchema
|
|
||||||
import net.corda.core.schemas.PersistentState
|
|
||||||
import net.corda.core.schemas.QueryableState
|
|
||||||
import net.corda.core.schemas.*
|
import net.corda.core.schemas.*
|
||||||
import net.corda.core.schemas.MappedSchemaValidator.crossReferencesToOtherMappedSchema
|
import net.corda.core.schemas.MappedSchemaValidator.crossReferencesToOtherMappedSchema
|
||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||||
@ -28,7 +24,6 @@ import net.corda.node.services.identity.PersistentIdentityService
|
|||||||
import net.corda.node.services.keys.PersistentKeyManagementService
|
import net.corda.node.services.keys.PersistentKeyManagementService
|
||||||
import net.corda.node.services.messaging.P2PMessageDeduplicator
|
import net.corda.node.services.messaging.P2PMessageDeduplicator
|
||||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||||
import net.corda.node.services.persistence.DBTransactionMappingStorage
|
|
||||||
import net.corda.node.services.persistence.DBTransactionStorage
|
import net.corda.node.services.persistence.DBTransactionStorage
|
||||||
import net.corda.node.services.persistence.NodeAttachmentService
|
import net.corda.node.services.persistence.NodeAttachmentService
|
||||||
import net.corda.node.services.persistence.RunOnceService
|
import net.corda.node.services.persistence.RunOnceService
|
||||||
@ -52,7 +47,6 @@ class NodeSchemaService(extraSchemas: Set<MappedSchema> = emptySet(), includeNot
|
|||||||
object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1,
|
object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1,
|
||||||
mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java,
|
mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java,
|
||||||
DBTransactionStorage.DBTransaction::class.java,
|
DBTransactionStorage.DBTransaction::class.java,
|
||||||
DBTransactionMappingStorage.DBTransactionMapping::class.java,
|
|
||||||
PersistentKeyManagementService.PersistentKey::class.java,
|
PersistentKeyManagementService.PersistentKey::class.java,
|
||||||
NodeSchedulerService.PersistentScheduledState::class.java,
|
NodeSchedulerService.PersistentScheduledState::class.java,
|
||||||
NodeAttachmentService.DBAttachment::class.java,
|
NodeAttachmentService.DBAttachment::class.java,
|
||||||
|
@ -10,9 +10,8 @@
|
|||||||
-->
|
-->
|
||||||
|
|
||||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||||
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
|
|
||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||||
|
|
||||||
<include file="migration/node-core.changelog-init.xml"/>
|
<include file="migration/node-core.changelog-init.xml"/>
|
||||||
<include file="migration/node-core.changelog-v3.xml"/>
|
<include file="migration/node-core.changelog-v3.xml"/>
|
||||||
@ -20,5 +19,6 @@
|
|||||||
<include file="migration/node-core.changelog-v5.xml"/>
|
<include file="migration/node-core.changelog-v5.xml"/>
|
||||||
<include file="migration/node-core.changelog-pkey.xml"/>
|
<include file="migration/node-core.changelog-pkey.xml"/>
|
||||||
<include file="migration/node-core.changelog-postgres-blob.xml"/>
|
<include file="migration/node-core.changelog-postgres-blob.xml"/>
|
||||||
|
<include file="migration/node-core.changelog-tx-mapping.xml"/>
|
||||||
|
|
||||||
</databaseChangeLog>
|
</databaseChangeLog>
|
||||||
|
@ -0,0 +1,27 @@
|
|||||||
|
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||||
|
<!--
|
||||||
|
~ R3 Proprietary and Confidential
|
||||||
|
~
|
||||||
|
~ Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||||
|
~
|
||||||
|
~ The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||||
|
~
|
||||||
|
~ Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||||
|
<changeSet author="R3.Corda" id="add_tx_mapping_column">
|
||||||
|
<addColumn tableName="node_transactions">
|
||||||
|
<column name="state_machine_run_id" type="NVARCHAR(36)">
|
||||||
|
<constraints nullable="true"/>
|
||||||
|
</column>
|
||||||
|
</addColumn>
|
||||||
|
<!-- Copy old values from the table to the new column -->
|
||||||
|
<sql>update node_transactions set state_machine_run_id=(select state_machine_run_id from
|
||||||
|
node_transaction_mappings where node_transactions.tx_id = node_transaction_mappings.tx_id)
|
||||||
|
</sql>
|
||||||
|
<dropTable tableName="node_transaction_mappings"/>
|
||||||
|
</changeSet>
|
||||||
|
</databaseChangeLog>
|
Reference in New Issue
Block a user