ENT-2202 Fold transaction mapping column into transactions table (#3836)

* ENT-2202 Fold transaction mapping column into transactions table

* Remove from node schema.

* Fix bug.
This commit is contained in:
Rick Parker 2018-08-22 14:14:55 +01:00 committed by GitHub
parent 2fae95c58f
commit 30d07bc998
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 14 additions and 43 deletions

View File

@ -6,17 +6,13 @@ import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.util.* import java.util.*
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
/** /**
* Database storage of a txhash -> state machine id mapping. * Database storage of a txhash -> state machine id mapping.
@ -26,46 +22,22 @@ import javax.persistence.Id
*/ */
@ThreadSafe @ThreadSafe
class DBTransactionMappingStorage(private val database: CordaPersistence) : StateMachineRecordedTransactionMappingStorage { class DBTransactionMappingStorage(private val database: CordaPersistence) : StateMachineRecordedTransactionMappingStorage {
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}transaction_mappings")
class DBTransactionMapping(
@Id
@Column(name = "tx_id", length = 64, nullable = false)
var txId: String = "",
@Column(name = "state_machine_run_id", length = 36, nullable = true)
var stateMachineRunId: String? = ""
)
private companion object {
fun createMap(): AppendOnlyPersistentMap<SecureHash, StateMachineRunId, DBTransactionMapping, String> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toString() },
fromPersistentEntity = { Pair(SecureHash.parse(it.txId), StateMachineRunId(UUID.fromString(it.stateMachineRunId))) },
toPersistentEntity = { key: SecureHash, (uuid) ->
DBTransactionMapping().apply {
txId = key.toString()
stateMachineRunId = uuid.toString()
}
},
persistentEntityClass = DBTransactionMapping::class.java
)
}
}
val stateMachineTransactionMap = createMap()
val updates: PublishSubject<StateMachineTransactionMapping> = PublishSubject.create() val updates: PublishSubject<StateMachineTransactionMapping> = PublishSubject.create()
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) { override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
database.transaction { database.transaction {
stateMachineTransactionMap.addWithDuplicatesAllowed(transactionId, stateMachineRunId)
updates.bufferUntilDatabaseCommit().onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId)) updates.bufferUntilDatabaseCommit().onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
} }
} }
override fun track(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> = database.transaction { override fun track(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> = database.transaction {
DataFeed(stateMachineTransactionMap.allPersisted().map { StateMachineTransactionMapping(it.second, it.first) }.toList(), val session = currentDBSession()
updates.bufferUntilSubscribed().wrapWithDatabaseTransaction()) val cb = session.criteriaBuilder
val cq = cb.createTupleQuery()
val from = cq.from(DBTransactionStorage.DBTransaction::class.java)
cq.multiselect(from.get<String>(DBTransactionStorage.DBTransaction::stateMachineRunId.name), from.get<String>(DBTransactionStorage.DBTransaction::txId.name))
cq.where(cb.isNotNull(from.get<String>(DBTransactionStorage.DBTransaction::stateMachineRunId.name)))
val flowIds = session.createQuery(cq).resultList.map { StateMachineTransactionMapping(StateMachineRunId(UUID.fromString(it[0] as String)), SecureHash.parse(it[1] as String)) }
DataFeed(flowIds, updates.bufferUntilSubscribed().wrapWithDatabaseTransaction())
} }
} }

View File

@ -13,6 +13,7 @@ import net.corda.core.toFuture
import net.corda.core.transactions.CoreTransaction import net.corda.core.transactions.CoreTransaction
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.WritableTransactionStorage import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.utilities.AppendOnlyPersistentMapBase import net.corda.node.utilities.AppendOnlyPersistentMapBase
import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -39,6 +40,9 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers
@Column(name = "tx_id", length = 64, nullable = false) @Column(name = "tx_id", length = 64, nullable = false)
var txId: String = "", var txId: String = "",
@Column(name = "state_machine_run_id", length = 36, nullable = true)
var stateMachineRunId: String? = "",
@Lob @Lob
@Column(name = "transaction_value", nullable = false) @Column(name = "transaction_value", nullable = false)
var transaction: ByteArray = EMPTY_BYTE_ARRAY var transaction: ByteArray = EMPTY_BYTE_ARRAY
@ -57,6 +61,7 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers
toPersistentEntity = { key: SecureHash, value: TxCacheValue -> toPersistentEntity = { key: SecureHash, value: TxCacheValue ->
DBTransaction().apply { DBTransaction().apply {
txId = key.toString() txId = key.toString()
stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id?.uuid?.toString()
transaction = value.toSignedTx().serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes transaction = value.toSignedTx().serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
} }
}, },

View File

@ -3,10 +3,6 @@ package net.corda.node.services.schema
import net.corda.core.contracts.ContractState import net.corda.core.contracts.ContractState
import net.corda.core.contracts.FungibleAsset import net.corda.core.contracts.FungibleAsset
import net.corda.core.contracts.LinearState import net.corda.core.contracts.LinearState
import net.corda.core.schemas.CommonSchemaV1
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
import net.corda.core.schemas.* import net.corda.core.schemas.*
import net.corda.core.schemas.MappedSchemaValidator.crossReferencesToOtherMappedSchema import net.corda.core.schemas.MappedSchemaValidator.crossReferencesToOtherMappedSchema
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
@ -18,7 +14,6 @@ import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.P2PMessageDeduplicator import net.corda.node.services.messaging.P2PMessageDeduplicator
import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.transactions.BFTNonValidatingNotaryService import net.corda.node.services.transactions.BFTNonValidatingNotaryService
@ -41,7 +36,6 @@ class NodeSchemaService(extraSchemas: Set<MappedSchema> = emptySet(), includeNot
object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1, object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1,
mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java, mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java,
DBTransactionStorage.DBTransaction::class.java, DBTransactionStorage.DBTransaction::class.java,
DBTransactionMappingStorage.DBTransactionMapping::class.java,
PersistentKeyManagementService.PersistentKey::class.java, PersistentKeyManagementService.PersistentKey::class.java,
NodeSchedulerService.PersistentScheduledState::class.java, NodeSchedulerService.PersistentScheduledState::class.java,
NodeAttachmentService.DBAttachment::class.java, NodeAttachmentService.DBAttachment::class.java,