From 87bd97d0c78f9d8c922291910bb8e2c9c711c70b Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 24 Nov 2016 18:00:04 +0000 Subject: [PATCH] Check for duplicate transaction records --- .../core/node/services/TransactionStorage.kt | 4 +++- .../node/services/api/ServiceHubInternal.kt | 8 ++++---- .../persistence/DBTransactionStorage.kt | 19 +++++++++++++++---- .../messaging/TwoPartyTradeProtocolTests.kt | 3 ++- .../net/corda/testing/node/MockServices.kt | 10 +++++++--- 5 files changed, 31 insertions(+), 13 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt b/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt index f1f5178d53..a6788cc1d2 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt @@ -32,7 +32,9 @@ interface TransactionStorage : ReadOnlyTransactionStorage { /** * Add a new transaction to the store. If the store already has a transaction with the same id it will be * overwritten. + * @param transaction The transaction to be recorded. + * @return true if the transaction was recorded successfully, false if it was already recorded. */ // TODO: Throw an exception if trying to add a transaction with fewer signatures than an existing entry. - fun addTransaction(transaction: SignedTransaction) + fun addTransaction(transaction: SignedTransaction): Boolean } diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 1a9c5a9a48..ac32fbf048 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -52,15 +52,15 @@ abstract class ServiceHubInternal : PluginServiceHub { */ internal fun recordTransactionsInternal(writableStorageService: TxWritableStorageService, txs: Iterable) { val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id + val recordedTransactions = txs.filter { writableStorageService.validatedTransactions.addTransaction(it) } if (stateMachineRunId != null) { - txs.forEach { + recordedTransactions.forEach { storageService.stateMachineRecordedTransactionMapping.addMapping(stateMachineRunId, it.id) } } else { - log.warn("Transaction recorded from outside of a state machine") + log.warn("Transactions recorded from outside of a state machine") } - txs.forEach { writableStorageService.validatedTransactions.addTransaction(it) } - vaultService.notifyAll(txs.map { it.tx }) + vaultService.notifyAll(recordedTransactions.map { it.tx }) } /** diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 8f2f8d797a..fca852a687 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -7,6 +7,7 @@ import net.corda.core.node.services.TransactionStorage import net.corda.core.transactions.SignedTransaction import net.corda.node.utilities.* import org.jetbrains.exposed.sql.ResultRow +import org.jetbrains.exposed.sql.exposedLogger import org.jetbrains.exposed.sql.statements.InsertStatement import rx.Observable import rx.subjects.PublishSubject @@ -34,11 +35,21 @@ class DBTransactionStorage : TransactionStorage { private val txStorage = synchronizedMap(TransactionsMap()) - override fun addTransaction(transaction: SignedTransaction) { - synchronized(txStorage) { - txStorage.put(transaction.id, transaction) - updatesPublisher.onNext(transaction) + override fun addTransaction(transaction: SignedTransaction): Boolean { + val recorded = synchronized(txStorage) { + val old = txStorage.get(transaction.id) + if (old == null) { + txStorage.put(transaction.id, transaction) + updatesPublisher.onNext(transaction) + true + } else { + false + } } + if (!recorded) { + exposedLogger.warn("Duplicate recording of transaction ${transaction.id}") + } + return recorded } override fun getTransaction(id: SecureHash): SignedTransaction? { diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeProtocolTests.kt index fdc7a7678f..6e5c6bcd1f 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeProtocolTests.kt @@ -561,11 +561,12 @@ class TwoPartyTradeFlowTests { override val updates: Observable get() = delegate.updates - override fun addTransaction(transaction: SignedTransaction) { + override fun addTransaction(transaction: SignedTransaction): Boolean { databaseTransaction(database) { records.add(TxRecord.Add(transaction)) delegate.addTransaction(transaction) } + return true } override fun getTransaction(id: SecureHash): SignedTransaction? { diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt index da29153ac3..5633dd4dab 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -1,5 +1,6 @@ package net.corda.testing.node +import kotlinx.support.jdk8.collections.putIfAbsent import net.corda.core.contracts.Attachment import net.corda.core.crypto.* import net.corda.core.flows.FlowLogic @@ -139,9 +140,12 @@ open class MockTransactionStorage : TransactionStorage { private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction) - override fun addTransaction(transaction: SignedTransaction) { - txns[transaction.id] = transaction - notify(transaction) + override fun addTransaction(transaction: SignedTransaction): Boolean { + val recorded = txns.putIfAbsent(transaction.id, transaction) == null + if (recorded) { + notify(transaction) + } + return recorded } override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]