Merged in aslemmer-warn-on-double-tx-record (pull request #546)

Check for duplicate transaction records
This commit is contained in:
Andras Slemmer 2016-11-28 10:34:34 +00:00
commit 811b0e6a8d
5 changed files with 31 additions and 13 deletions

View File

@ -32,7 +32,9 @@ interface TransactionStorage : ReadOnlyTransactionStorage {
/**
* Add a new transaction to the store. If the store already has a transaction with the same id it will be
* overwritten.
* @param transaction The transaction to be recorded.
* @return true if the transaction was recorded successfully, false if it was already recorded.
*/
// TODO: Throw an exception if trying to add a transaction with fewer signatures than an existing entry.
fun addTransaction(transaction: SignedTransaction)
fun addTransaction(transaction: SignedTransaction): Boolean
}

View File

@ -52,15 +52,15 @@ abstract class ServiceHubInternal : PluginServiceHub {
*/
internal fun recordTransactionsInternal(writableStorageService: TxWritableStorageService, txs: Iterable<SignedTransaction>) {
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
val recordedTransactions = txs.filter { writableStorageService.validatedTransactions.addTransaction(it) }
if (stateMachineRunId != null) {
txs.forEach {
recordedTransactions.forEach {
storageService.stateMachineRecordedTransactionMapping.addMapping(stateMachineRunId, it.id)
}
} else {
log.warn("Transaction recorded from outside of a state machine")
log.warn("Transactions recorded from outside of a state machine")
}
txs.forEach { writableStorageService.validatedTransactions.addTransaction(it) }
vaultService.notifyAll(txs.map { it.tx })
vaultService.notifyAll(recordedTransactions.map { it.tx })
}
/**

View File

@ -7,6 +7,7 @@ import net.corda.core.node.services.TransactionStorage
import net.corda.core.transactions.SignedTransaction
import net.corda.node.utilities.*
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.exposedLogger
import org.jetbrains.exposed.sql.statements.InsertStatement
import rx.Observable
import rx.subjects.PublishSubject
@ -34,11 +35,21 @@ class DBTransactionStorage : TransactionStorage {
private val txStorage = synchronizedMap(TransactionsMap())
override fun addTransaction(transaction: SignedTransaction) {
synchronized(txStorage) {
txStorage.put(transaction.id, transaction)
updatesPublisher.onNext(transaction)
override fun addTransaction(transaction: SignedTransaction): Boolean {
val recorded = synchronized(txStorage) {
val old = txStorage.get(transaction.id)
if (old == null) {
txStorage.put(transaction.id, transaction)
updatesPublisher.onNext(transaction)
true
} else {
false
}
}
if (!recorded) {
exposedLogger.warn("Duplicate recording of transaction ${transaction.id}")
}
return recorded
}
override fun getTransaction(id: SecureHash): SignedTransaction? {

View File

@ -561,11 +561,12 @@ class TwoPartyTradeFlowTests {
override val updates: Observable<SignedTransaction>
get() = delegate.updates
override fun addTransaction(transaction: SignedTransaction) {
override fun addTransaction(transaction: SignedTransaction): Boolean {
databaseTransaction(database) {
records.add(TxRecord.Add(transaction))
delegate.addTransaction(transaction)
}
return true
}
override fun getTransaction(id: SecureHash): SignedTransaction? {

View File

@ -1,5 +1,6 @@
package net.corda.testing.node
import kotlinx.support.jdk8.collections.putIfAbsent
import net.corda.core.contracts.Attachment
import net.corda.core.crypto.*
import net.corda.core.flows.FlowLogic
@ -139,9 +140,12 @@ open class MockTransactionStorage : TransactionStorage {
private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction)
override fun addTransaction(transaction: SignedTransaction) {
txns[transaction.id] = transaction
notify(transaction)
override fun addTransaction(transaction: SignedTransaction): Boolean {
val recorded = txns.putIfAbsent(transaction.id, transaction) == null
if (recorded) {
notify(transaction)
}
return recorded
}
override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]