diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index a0c9b7712c..750ce86f73 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -37,6 +37,7 @@ import net.corda.nodeapi.internal.persistence.contextTransactionOrNull import net.corda.nodeapi.internal.persistence.currentDBSession import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import net.corda.serialization.internal.CordaSerializationEncoding.SNAPPY +import org.hibernate.annotations.Type import rx.Observable import rx.subjects.PublishSubject import java.time.Instant @@ -77,6 +78,7 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac val timestamp: Instant, @Column(name = "signatures") + @Type(type = "corda-blob") val signatures: ByteArray? ) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt index c68942bd8e..a8e72accee 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt @@ -4,16 +4,12 @@ import net.corda.core.crypto.SecureHash import net.corda.core.flows.DistributionList.ReceiverDistributionList import net.corda.core.flows.DistributionList.SenderDistributionList import net.corda.core.flows.DistributionRecordKey -import net.corda.core.flows.DistributionRecordType -import net.corda.core.flows.DistributionRecords import net.corda.core.flows.ReceiverDistributionRecord -import net.corda.core.flows.RecoveryTimeWindow import net.corda.core.flows.SenderDistributionRecord import net.corda.core.flows.TransactionMetadata import net.corda.core.identity.CordaX500Name import net.corda.core.internal.NamedCacheFactory import net.corda.core.node.StatesToRecord -import net.corda.core.node.services.vault.Sort import net.corda.core.utilities.OpaqueBytes import net.corda.node.CordaClock import net.corda.node.services.EncryptionService @@ -32,7 +28,6 @@ import javax.persistence.Entity import javax.persistence.Id import javax.persistence.Lob import javax.persistence.Table -import javax.persistence.criteria.Predicate class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, cacheFactory: NamedCacheFactory, @@ -214,100 +209,6 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, } } - fun queryDistributionRecords(timeWindow: RecoveryTimeWindow, - recordType: DistributionRecordType = DistributionRecordType.ALL, - excludingTxnIds: Set = emptySet(), - orderByTimestamp: Sort.Direction? = null - ): DistributionRecords { - return when(recordType) { - DistributionRecordType.SENDER -> - DistributionRecords(senderRecords = - querySenderDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp)) - DistributionRecordType.RECEIVER -> - DistributionRecords(receiverRecords = - queryReceiverDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp)) - DistributionRecordType.ALL -> - DistributionRecords(senderRecords = - querySenderDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp), - receiverRecords = - queryReceiverDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp)) - } - } - - @Suppress("SpreadOperator") - fun querySenderDistributionRecords(timeWindow: RecoveryTimeWindow, - peers: Set = emptySet(), - excludingTxnIds: Set = emptySet(), - orderByTimestamp: Sort.Direction? = null - ): List { - return database.transaction { - val criteriaBuilder = session.criteriaBuilder - val criteriaQuery = criteriaBuilder.createQuery(DBSenderDistributionRecord::class.java) - val txnMetadata = criteriaQuery.from(DBSenderDistributionRecord::class.java) - val predicates = mutableListOf() - val compositeKey = txnMetadata.get("compositeKey") - predicates.add(criteriaBuilder.greaterThanOrEqualTo(compositeKey.get(PersistentKey::timestamp.name), timeWindow.fromTime)) - predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(compositeKey.get(PersistentKey::timestamp.name), timeWindow.untilTime))) - if (excludingTxnIds.isNotEmpty()) { - predicates.add(criteriaBuilder.and(criteriaBuilder.not(compositeKey.get(PersistentKey::txId.name).`in`( - excludingTxnIds.map { it.toString() })))) - } - if (peers.isNotEmpty()) { - val peerPartyIds = peers.map { partyInfoCache.getPartyIdByCordaX500Name(it).toString() } - predicates.add(criteriaBuilder.and(compositeKey.get(PersistentKey::peerPartyId.name).`in`(peerPartyIds))) - } - criteriaQuery.where(*predicates.toTypedArray()) - // optionally order by timestamp - orderByTimestamp?.let { - val orderCriteria = - when (orderByTimestamp) { - // when adding column position of 'group by' shift in case columns were removed - Sort.Direction.ASC -> criteriaBuilder.asc(compositeKey.get(PersistentKey::timestamp.name)) - Sort.Direction.DESC -> criteriaBuilder.desc(compositeKey.get(PersistentKey::timestamp.name)) - } - criteriaQuery.orderBy(orderCriteria) - } - session.createQuery(criteriaQuery).resultList - }.map { it.toSenderDistributionRecord() } - } - - @Suppress("SpreadOperator") - fun queryReceiverDistributionRecords(timeWindow: RecoveryTimeWindow, - initiators: Set = emptySet(), - excludingTxnIds: Set = emptySet(), - orderByTimestamp: Sort.Direction? = null - ): List { - return database.transaction { - val criteriaBuilder = session.criteriaBuilder - val criteriaQuery = criteriaBuilder.createQuery(DBReceiverDistributionRecord::class.java) - val txnMetadata = criteriaQuery.from(DBReceiverDistributionRecord::class.java) - val predicates = mutableListOf() - val compositeKey = txnMetadata.get("compositeKey") - val timestamp = compositeKey.get(PersistentKey::timestamp.name) - predicates.add(criteriaBuilder.greaterThanOrEqualTo(timestamp, timeWindow.fromTime)) - predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(timestamp, timeWindow.untilTime))) - if (excludingTxnIds.isNotEmpty()) { - val txId = compositeKey.get(PersistentKey::txId.name) - predicates.add(criteriaBuilder.and(criteriaBuilder.not(txId.`in`(excludingTxnIds.map { it.toString() })))) - } - if (initiators.isNotEmpty()) { - val initiatorPartyIds = initiators.map { partyInfoCache.getPartyIdByCordaX500Name(it).toString() } - predicates.add(criteriaBuilder.and(compositeKey.get(PersistentKey::peerPartyId.name).`in`(initiatorPartyIds))) - } - criteriaQuery.where(*predicates.toTypedArray()) - // optionally order by timestamp - orderByTimestamp?.let { - val orderCriteria = when (orderByTimestamp) { - // when adding column position of 'group by' shift in case columns were removed - Sort.Direction.ASC -> criteriaBuilder.asc(timestamp) - Sort.Direction.DESC -> criteriaBuilder.desc(timestamp) - } - criteriaQuery.orderBy(orderCriteria) - } - session.createQuery(criteriaQuery).resultList - }.map { it.toReceiverDistributionRecord() } - } - fun decryptHashedDistributionList(encryptedBytes: ByteArray): HashedDistributionList { return HashedDistributionList.decrypt(encryptedBytes, encryptionService) } diff --git a/node/src/main/resources/migration/node-core.changelog-v23.xml b/node/src/main/resources/migration/node-core.changelog-v23.xml index 095f5e6bee..235c65b31b 100644 --- a/node/src/main/resources/migration/node-core.changelog-v23.xml +++ b/node/src/main/resources/migration/node-core.changelog-v23.xml @@ -4,9 +4,16 @@ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd" logicalFilePath="migration/node-services.changelog-init.xml"> - + - + + + + + + + + \ No newline at end of file diff --git a/node/src/main/resources/migration/node-core.changelog-v25.xml b/node/src/main/resources/migration/node-core.changelog-v25.xml index dee14a6fa2..75a4144667 100644 --- a/node/src/main/resources/migration/node-core.changelog-v25.xml +++ b/node/src/main/resources/migration/node-core.changelog-v25.xml @@ -39,7 +39,7 @@ - + diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt index c377b660ff..8b3ecc64df 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt @@ -8,9 +8,7 @@ import net.corda.core.crypto.SignatureMetadata import net.corda.core.crypto.sign import net.corda.core.flows.DistributionList.ReceiverDistributionList import net.corda.core.flows.DistributionList.SenderDistributionList -import net.corda.core.flows.DistributionRecordType import net.corda.core.flows.ReceiverDistributionRecord -import net.corda.core.flows.RecoveryTimeWindow import net.corda.core.flows.SenderDistributionRecord import net.corda.core.flows.TransactionMetadata import net.corda.core.node.NodeInfo @@ -52,9 +50,7 @@ import org.junit.Rule import org.junit.Test import java.security.KeyPair import java.time.Clock -import java.time.Duration import java.time.Instant -import java.time.temporal.ChronoUnit import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertNull @@ -93,156 +89,6 @@ class DBTransactionStorageLedgerRecoveryTests { return transactionRecovery.clock.instant() } - @Test(timeout = 300_000) - fun `query local ledger for transactions with recovery peers within time window`() { - val beforeFirstTxn = now().truncatedTo(ChronoUnit.SECONDS) - val txn = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn) - transactionRecovery.addSenderTransactionRecoveryMetadata(txn.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT)))) - val timeWindow = RecoveryTimeWindow(fromTime = beforeFirstTxn, - untilTime = beforeFirstTxn.plus(1, ChronoUnit.MINUTES)) - val results = transactionRecovery.querySenderDistributionRecords(timeWindow) - assertEquals(1, results.size) - (transactionRecovery.clock as TestClock).advanceBy(Duration.ofSeconds(1)) - val afterFirstTxn = now().truncatedTo(ChronoUnit.SECONDS) - val txn2 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn2) - transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT)))) - assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow).size) - assertEquals(1, transactionRecovery.querySenderDistributionRecords(RecoveryTimeWindow(fromTime = afterFirstTxn, - untilTime = afterFirstTxn.plus(1, ChronoUnit.MINUTES))).size) - } - - @Test(timeout = 300_000) - fun `query local ledger for transactions within timeWindow and excluding remoteTransactionIds`() { - val transaction1 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(transaction1) - transactionRecovery.addSenderTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT)))) - val transaction2 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(transaction2) - transactionRecovery.addSenderTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT)))) - val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) - val results = transactionRecovery.querySenderDistributionRecords(timeWindow, excludingTxnIds = setOf(transaction1.id)) - assertEquals(1, results.size) - assertEquals(transaction2.id, results[0].txId) - } - - @Test(timeout = 300_000) - fun `query local ledger for transactions within timeWindow and for given peers`() { - val transaction1 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(transaction1) - transactionRecovery.addSenderTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT)))) - val transaction2 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(transaction2) - transactionRecovery.addSenderTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(CHARLIE_NAME to ONLY_RELEVANT)))) - val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) - val results = transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME)) - assertEquals(1, results.size) - assertEquals(transaction2.id, results[0].txId) - } - - @Test(timeout = 300_000) - fun `query local ledger by distribution record type`() { - val transaction1 = newTransaction() - // sender txn - transactionRecovery.addUnnotarisedTransaction(transaction1) - transactionRecovery.addSenderTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE)))) - val transaction2 = newTransaction() - // receiver txn - transactionRecovery.addUnnotarisedTransaction(transaction2) - val encryptedDL = transactionRecovery.addSenderTransactionRecoveryMetadata(transaction2.id, - TransactionMetadata(BOB_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE)))) - transactionRecovery.addReceiverTransactionRecoveryMetadata(transaction2.id, BOB_NAME, - TransactionMetadata(BOB_NAME, ReceiverDistributionList(encryptedDL, ALL_VISIBLE))) - val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) - transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let { - assertEquals(2, it.size) - assertEquals(SecureHash.sha256(BOB_NAME.toString()), it.senderRecords[0].peerPartyId) - assertEquals(ALL_VISIBLE, it.senderRecords[0].senderStatesToRecord) - } - transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.RECEIVER).let { - assertEquals(1, it.size) - assertEquals(SecureHash.sha256(BOB_NAME.toString()), it.receiverRecords[0].peerPartyId) - assertEquals(ALL_VISIBLE, (HashedDistributionList.decrypt(it.receiverRecords[0].encryptedDistributionList.bytes, encryptionService)).peerHashToStatesToRecord.map { it.value }[0]) - } - val resultsAll = transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.ALL) - assertEquals(3, resultsAll.size) - } - - @Test(timeout = 300_000) - fun `query for sender distribution records by peers`() { - val txn1 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn1) - transactionRecovery.addSenderTransactionRecoveryMetadata(txn1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE)))) - val txn2 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn2) - transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT)))) - val txn3 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn3) - transactionRecovery.addSenderTransactionRecoveryMetadata(txn3.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT, CHARLIE_NAME to ALL_VISIBLE)))) - val txn4 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn4) - transactionRecovery.addSenderTransactionRecoveryMetadata(txn4.id, TransactionMetadata(BOB_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ONLY_RELEVANT)))) - val txn5 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn5) - transactionRecovery.addSenderTransactionRecoveryMetadata(txn5.id, TransactionMetadata(CHARLIE_NAME, SenderDistributionList(ONLY_RELEVANT, emptyMap()))) - assertEquals(5, readSenderDistributionRecordFromDB().size) - - val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) - transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(BOB_NAME)).let { - assertEquals(2, it.size) - assertEquals(it[0].senderStatesToRecord, ALL_VISIBLE) - assertEquals(it[1].senderStatesToRecord, ONLY_RELEVANT) - } - assertEquals(1, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(ALICE_NAME)).size) - assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME)).size) - } - - @Test(timeout = 300_000) - fun `query for receiver distribution records by initiator`() { - val txn1 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn1) - val encryptedDL1 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn1.id, - TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE, CHARLIE_NAME to ALL_VISIBLE)))) - transactionRecovery.addReceiverTransactionRecoveryMetadata(txn1.id, ALICE_NAME, - TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL1, ALL_VISIBLE))) - val txn2 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn2) - val encryptedDL2 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id, - TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT)))) - transactionRecovery.addReceiverTransactionRecoveryMetadata(txn2.id, ALICE_NAME, - TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL2, ONLY_RELEVANT))) - val txn3 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn3) - val encryptedDL3 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn3.id, - TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to NONE)))) - transactionRecovery.addReceiverTransactionRecoveryMetadata(txn3.id, ALICE_NAME, - TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL3, NONE))) - val txn4 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn4) - val encryptedDL4 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn4.id, - TransactionMetadata(BOB_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE)))) - transactionRecovery.addReceiverTransactionRecoveryMetadata(txn4.id, BOB_NAME, - TransactionMetadata(BOB_NAME, ReceiverDistributionList(encryptedDL4, ALL_VISIBLE))) - val txn5 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(txn5) - val encryptedDL5 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn5.id, - TransactionMetadata(CHARLIE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT)))) - transactionRecovery.addReceiverTransactionRecoveryMetadata(txn5.id, CHARLIE_NAME, - TransactionMetadata(CHARLIE_NAME, ReceiverDistributionList(encryptedDL5, ONLY_RELEVANT))) - - val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) - transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(ALICE_NAME)).let { - assertEquals(3, it.size) - assertEquals(HashedDistributionList.decrypt(it[0].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], ALL_VISIBLE) - assertEquals(HashedDistributionList.decrypt(it[1].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], ONLY_RELEVANT) - assertEquals(HashedDistributionList.decrypt(it[2].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], NONE) - } - assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME)).size) - assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(CHARLIE_NAME)).size) - assertEquals(2, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME, CHARLIE_NAME)).size) - } - @Test(timeout = 300_000) fun `transaction without peers does not store recovery metadata in database`() { val senderTransaction = newTransaction()