Check for duplicate transaction records

This commit is contained in:
Andras Slemmer
2016-11-24 18:00:04 +00:00
parent cd34f3ae16
commit 87bd97d0c7
5 changed files with 31 additions and 13 deletions

View File

@ -32,7 +32,9 @@ interface TransactionStorage : ReadOnlyTransactionStorage {
/** /**
* Add a new transaction to the store. If the store already has a transaction with the same id it will be * Add a new transaction to the store. If the store already has a transaction with the same id it will be
* overwritten. * overwritten.
* @param transaction The transaction to be recorded.
* @return true if the transaction was recorded successfully, false if it was already recorded.
*/ */
// TODO: Throw an exception if trying to add a transaction with fewer signatures than an existing entry. // TODO: Throw an exception if trying to add a transaction with fewer signatures than an existing entry.
fun addTransaction(transaction: SignedTransaction) fun addTransaction(transaction: SignedTransaction): Boolean
} }

View File

@ -52,15 +52,15 @@ abstract class ServiceHubInternal : PluginServiceHub {
*/ */
internal fun recordTransactionsInternal(writableStorageService: TxWritableStorageService, txs: Iterable<SignedTransaction>) { internal fun recordTransactionsInternal(writableStorageService: TxWritableStorageService, txs: Iterable<SignedTransaction>) {
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
val recordedTransactions = txs.filter { writableStorageService.validatedTransactions.addTransaction(it) }
if (stateMachineRunId != null) { if (stateMachineRunId != null) {
txs.forEach { recordedTransactions.forEach {
storageService.stateMachineRecordedTransactionMapping.addMapping(stateMachineRunId, it.id) storageService.stateMachineRecordedTransactionMapping.addMapping(stateMachineRunId, it.id)
} }
} else { } else {
log.warn("Transaction recorded from outside of a state machine") log.warn("Transactions recorded from outside of a state machine")
} }
txs.forEach { writableStorageService.validatedTransactions.addTransaction(it) } vaultService.notifyAll(recordedTransactions.map { it.tx })
vaultService.notifyAll(txs.map { it.tx })
} }
/** /**

View File

@ -7,6 +7,7 @@ import net.corda.core.node.services.TransactionStorage
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.node.utilities.* import net.corda.node.utilities.*
import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.exposedLogger
import org.jetbrains.exposed.sql.statements.InsertStatement import org.jetbrains.exposed.sql.statements.InsertStatement
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
@ -34,12 +35,22 @@ class DBTransactionStorage : TransactionStorage {
private val txStorage = synchronizedMap(TransactionsMap()) private val txStorage = synchronizedMap(TransactionsMap())
override fun addTransaction(transaction: SignedTransaction) { override fun addTransaction(transaction: SignedTransaction): Boolean {
synchronized(txStorage) { val recorded = synchronized(txStorage) {
val old = txStorage.get(transaction.id)
if (old == null) {
txStorage.put(transaction.id, transaction) txStorage.put(transaction.id, transaction)
updatesPublisher.onNext(transaction) updatesPublisher.onNext(transaction)
true
} else {
false
} }
} }
if (!recorded) {
exposedLogger.warn("Duplicate recording of transaction ${transaction.id}")
}
return recorded
}
override fun getTransaction(id: SecureHash): SignedTransaction? { override fun getTransaction(id: SecureHash): SignedTransaction? {
synchronized(txStorage) { synchronized(txStorage) {

View File

@ -561,11 +561,12 @@ class TwoPartyTradeFlowTests {
override val updates: Observable<SignedTransaction> override val updates: Observable<SignedTransaction>
get() = delegate.updates get() = delegate.updates
override fun addTransaction(transaction: SignedTransaction) { override fun addTransaction(transaction: SignedTransaction): Boolean {
databaseTransaction(database) { databaseTransaction(database) {
records.add(TxRecord.Add(transaction)) records.add(TxRecord.Add(transaction))
delegate.addTransaction(transaction) delegate.addTransaction(transaction)
} }
return true
} }
override fun getTransaction(id: SecureHash): SignedTransaction? { override fun getTransaction(id: SecureHash): SignedTransaction? {

View File

@ -1,5 +1,6 @@
package net.corda.testing.node package net.corda.testing.node
import kotlinx.support.jdk8.collections.putIfAbsent
import net.corda.core.contracts.Attachment import net.corda.core.contracts.Attachment
import net.corda.core.crypto.* import net.corda.core.crypto.*
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
@ -139,10 +140,13 @@ open class MockTransactionStorage : TransactionStorage {
private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction) private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction)
override fun addTransaction(transaction: SignedTransaction) { override fun addTransaction(transaction: SignedTransaction): Boolean {
txns[transaction.id] = transaction val recorded = txns.putIfAbsent(transaction.id, transaction) == null
if (recorded) {
notify(transaction) notify(transaction)
} }
return recorded
}
override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id] override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]
} }