diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt index 6b13b24f56..a9fa015410 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt @@ -17,17 +17,13 @@ import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.messaging.DataFeed import net.corda.core.messaging.StateMachineTransactionMapping import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage -import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit +import net.corda.nodeapi.internal.persistence.currentDBSession import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import rx.subjects.PublishSubject import java.util.* import javax.annotation.concurrent.ThreadSafe -import javax.persistence.Column -import javax.persistence.Entity -import javax.persistence.Id /** * Database storage of a txhash -> state machine id mapping. @@ -37,36 +33,7 @@ import javax.persistence.Id */ @ThreadSafe class DBTransactionMappingStorage(private val database: CordaPersistence) : StateMachineRecordedTransactionMappingStorage { - - @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}transaction_mappings") - class DBTransactionMapping( - @Id - @Column(name = "tx_id", length = 64, nullable = false) - var txId: String = "", - - @Column(name = "state_machine_run_id", length = 36, nullable = true) - var stateMachineRunId: String? = "" - ) - - private companion object { - fun createMap(): AppendOnlyPersistentMap { - return AppendOnlyPersistentMap( - toPersistentEntityKey = { it.toString() }, - fromPersistentEntity = { Pair(SecureHash.parse(it.txId), StateMachineRunId(UUID.fromString(it.stateMachineRunId))) }, - toPersistentEntity = { key: SecureHash, (uuid) -> - DBTransactionMapping().apply { - txId = key.toString() - stateMachineRunId = uuid.toString() - } - }, - persistentEntityClass = DBTransactionMapping::class.java - ) - } - } - private class InnerState { - val stateMachineTransactionMap = createMap() val updates: PublishSubject = PublishSubject.create() } @@ -75,7 +42,6 @@ class DBTransactionMappingStorage(private val database: CordaPersistence) : Stat override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) { database.transaction { concurrentBox.concurrent { - stateMachineTransactionMap.addWithDuplicatesAllowed(transactionId, stateMachineRunId) updates.bufferUntilDatabaseCommit().onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId)) } } @@ -84,8 +50,14 @@ class DBTransactionMappingStorage(private val database: CordaPersistence) : Stat override fun track(): DataFeed, StateMachineTransactionMapping> { return database.transaction { concurrentBox.exclusive { - DataFeed(stateMachineTransactionMap.allPersisted().map { StateMachineTransactionMapping(it.second, it.first) }.toList(), - updates.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + val session = currentDBSession() + val cb = session.criteriaBuilder + val cq = cb.createTupleQuery() + val from = cq.from(DBTransactionStorage.DBTransaction::class.java) + cq.multiselect(from.get(DBTransactionStorage.DBTransaction::stateMachineRunId.name), from.get(DBTransactionStorage.DBTransaction::txId.name)) + cq.where(cb.isNotNull(from.get(DBTransactionStorage.DBTransaction::stateMachineRunId.name))) + val flowIds = session.createQuery(cq).resultList.map { StateMachineTransactionMapping(StateMachineRunId(UUID.fromString(it[0] as String)), SecureHash.parse(it[1] as String)) } + DataFeed(flowIds, updates.bufferUntilSubscribed().wrapWithDatabaseTransaction()) } } } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index c3561b2ce1..3b8246c47f 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -23,6 +23,7 @@ import net.corda.core.toFuture import net.corda.core.transactions.CoreTransaction import net.corda.core.transactions.SignedTransaction import net.corda.node.services.api.WritableTransactionStorage +import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.utilities.AppendOnlyPersistentMapBase import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -50,6 +51,9 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers @Column(name = "tx_id", length = 64, nullable = false) var txId: String = "", + @Column(name = "state_machine_run_id", length = 36, nullable = true) + var stateMachineRunId: String? = "", + @Lob @Column(name = "transaction_value", nullable = false) var transaction: ByteArray = EMPTY_BYTE_ARRAY @@ -68,6 +72,7 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers toPersistentEntity = { key: SecureHash, value: TxCacheValue -> DBTransaction().apply { txId = key.toString() + stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id?.uuid?.toString() transaction = value.toSignedTx(). serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(SNAPPY)).bytes } diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 3be330aba5..107d37b0bb 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -13,10 +13,6 @@ package net.corda.node.services.schema import net.corda.core.contracts.ContractState import net.corda.core.contracts.FungibleAsset import net.corda.core.contracts.LinearState -import net.corda.core.schemas.CommonSchemaV1 -import net.corda.core.schemas.MappedSchema -import net.corda.core.schemas.PersistentState -import net.corda.core.schemas.QueryableState import net.corda.core.schemas.* import net.corda.core.schemas.MappedSchemaValidator.crossReferencesToOtherMappedSchema import net.corda.core.serialization.SingletonSerializeAsToken @@ -28,7 +24,6 @@ import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.messaging.P2PMessageDeduplicator import net.corda.node.services.persistence.DBCheckpointStorage -import net.corda.node.services.persistence.DBTransactionMappingStorage import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.NodeAttachmentService import net.corda.node.services.persistence.RunOnceService @@ -52,7 +47,6 @@ class NodeSchemaService(extraSchemas: Set = emptySet(), includeNot object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1, mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java, DBTransactionStorage.DBTransaction::class.java, - DBTransactionMappingStorage.DBTransactionMapping::class.java, PersistentKeyManagementService.PersistentKey::class.java, NodeSchedulerService.PersistentScheduledState::class.java, NodeAttachmentService.DBAttachment::class.java,