From 30d07bc9982a468099bc94d57a2a1bf3359ad9c8 Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Wed, 22 Aug 2018 14:14:55 +0100 Subject: [PATCH] ENT-2202 Fold transaction mapping column into transactions table (#3836) * ENT-2202 Fold transaction mapping column into transactions table * Remove from node schema. * Fix bug. --- .../DBTransactionMappingStorage.kt | 46 ++++--------------- .../persistence/DBTransactionStorage.kt | 5 ++ .../node/services/schema/NodeSchemaService.kt | 6 --- 3 files changed, 14 insertions(+), 43 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt index 3073735e82..d3bec19d88 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt @@ -6,17 +6,13 @@ import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.messaging.DataFeed import net.corda.core.messaging.StateMachineTransactionMapping import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage -import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit +import net.corda.nodeapi.internal.persistence.currentDBSession import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import rx.subjects.PublishSubject import java.util.* import javax.annotation.concurrent.ThreadSafe -import javax.persistence.Column -import javax.persistence.Entity -import javax.persistence.Id /** * Database storage of a txhash -> state machine id mapping. @@ -26,46 +22,22 @@ import javax.persistence.Id */ @ThreadSafe class DBTransactionMappingStorage(private val database: CordaPersistence) : StateMachineRecordedTransactionMappingStorage { - - @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}transaction_mappings") - class DBTransactionMapping( - @Id - @Column(name = "tx_id", length = 64, nullable = false) - var txId: String = "", - - @Column(name = "state_machine_run_id", length = 36, nullable = true) - var stateMachineRunId: String? = "" - ) - - private companion object { - fun createMap(): AppendOnlyPersistentMap { - return AppendOnlyPersistentMap( - toPersistentEntityKey = { it.toString() }, - fromPersistentEntity = { Pair(SecureHash.parse(it.txId), StateMachineRunId(UUID.fromString(it.stateMachineRunId))) }, - toPersistentEntity = { key: SecureHash, (uuid) -> - DBTransactionMapping().apply { - txId = key.toString() - stateMachineRunId = uuid.toString() - } - }, - persistentEntityClass = DBTransactionMapping::class.java - ) - } - } - - val stateMachineTransactionMap = createMap() val updates: PublishSubject = PublishSubject.create() override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) { database.transaction { - stateMachineTransactionMap.addWithDuplicatesAllowed(transactionId, stateMachineRunId) updates.bufferUntilDatabaseCommit().onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId)) } } override fun track(): DataFeed, StateMachineTransactionMapping> = database.transaction { - DataFeed(stateMachineTransactionMap.allPersisted().map { StateMachineTransactionMapping(it.second, it.first) }.toList(), - updates.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + val session = currentDBSession() + val cb = session.criteriaBuilder + val cq = cb.createTupleQuery() + val from = cq.from(DBTransactionStorage.DBTransaction::class.java) + cq.multiselect(from.get(DBTransactionStorage.DBTransaction::stateMachineRunId.name), from.get(DBTransactionStorage.DBTransaction::txId.name)) + cq.where(cb.isNotNull(from.get(DBTransactionStorage.DBTransaction::stateMachineRunId.name))) + val flowIds = session.createQuery(cq).resultList.map { StateMachineTransactionMapping(StateMachineRunId(UUID.fromString(it[0] as String)), SecureHash.parse(it[1] as String)) } + DataFeed(flowIds, updates.bufferUntilSubscribed().wrapWithDatabaseTransaction()) } } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 239ab70fab..9d958870c4 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -13,6 +13,7 @@ import net.corda.core.toFuture import net.corda.core.transactions.CoreTransaction import net.corda.core.transactions.SignedTransaction import net.corda.node.services.api.WritableTransactionStorage +import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.utilities.AppendOnlyPersistentMapBase import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -39,6 +40,9 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers @Column(name = "tx_id", length = 64, nullable = false) var txId: String = "", + @Column(name = "state_machine_run_id", length = 36, nullable = true) + var stateMachineRunId: String? = "", + @Lob @Column(name = "transaction_value", nullable = false) var transaction: ByteArray = EMPTY_BYTE_ARRAY @@ -57,6 +61,7 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers toPersistentEntity = { key: SecureHash, value: TxCacheValue -> DBTransaction().apply { txId = key.toString() + stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id?.uuid?.toString() transaction = value.toSignedTx().serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes } }, diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 0421e00220..80fca08e14 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -3,10 +3,6 @@ package net.corda.node.services.schema import net.corda.core.contracts.ContractState import net.corda.core.contracts.FungibleAsset import net.corda.core.contracts.LinearState -import net.corda.core.schemas.CommonSchemaV1 -import net.corda.core.schemas.MappedSchema -import net.corda.core.schemas.PersistentState -import net.corda.core.schemas.QueryableState import net.corda.core.schemas.* import net.corda.core.schemas.MappedSchemaValidator.crossReferencesToOtherMappedSchema import net.corda.core.serialization.SingletonSerializeAsToken @@ -18,7 +14,6 @@ import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.messaging.P2PMessageDeduplicator import net.corda.node.services.persistence.DBCheckpointStorage -import net.corda.node.services.persistence.DBTransactionMappingStorage import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.NodeAttachmentService import net.corda.node.services.transactions.BFTNonValidatingNotaryService @@ -41,7 +36,6 @@ class NodeSchemaService(extraSchemas: Set = emptySet(), includeNot object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1, mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java, DBTransactionStorage.DBTransaction::class.java, - DBTransactionMappingStorage.DBTransactionMapping::class.java, PersistentKeyManagementService.PersistentKey::class.java, NodeSchedulerService.PersistentScheduledState::class.java, NodeAttachmentService.DBAttachment::class.java,