Added notifyVault flag to recordTransaction and skip notify for irrelevant TX (#1250)

* Add notifyVault flag to recordTransaction to skip notifying the vault for transactions from ResolveTransactionFlow.

* added methods for use in Java

* reverted format changes

* addressed PR issues
changed recordTransaction method signature
This commit is contained in:
Patrick Kuo 2017-08-18 09:46:59 +01:00 committed by GitHub
parent e49d3aa834
commit ae85759067
8 changed files with 37 additions and 20 deletions

View File

@ -92,7 +92,7 @@ class ResolveTransactionsFlow(private val txHashes: Set<SecureHash>,
// half way through, it's no big deal, although it might result in us attempting to re-download data
// redundantly next time we attempt verification.
it.verify(serviceHub)
serviceHub.recordTransactions(it)
serviceHub.recordTransactions(false, it)
}
return signedTransaction?.let {

View File

@ -68,18 +68,35 @@ interface ServiceHub : ServicesForResolution {
/**
* Stores the given [SignedTransaction]s in the local transaction storage and then sends them to the vault for
* further processing. This is expected to be run within a database transaction.
* further processing if [notifyVault] is true. This is expected to be run within a database transaction.
*
* @param txs The transactions to record.
* @param notifyVault indicate if the vault should be notified for the update.
*/
fun recordTransactions(txs: Iterable<SignedTransaction>)
fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>)
/**
* Stores the given [SignedTransaction]s in the local transaction storage and then sends them to the vault for
* further processing if [notifyVault] is true. This is expected to be run within a database transaction.
*/
fun recordTransactions(notifyVault: Boolean, first: SignedTransaction, vararg remaining: SignedTransaction) {
recordTransactions(notifyVault, listOf(first, *remaining))
}
/**
* Stores the given [SignedTransaction]s in the local transaction storage and then sends them to the vault for
* further processing. This is expected to be run within a database transaction.
*/
fun recordTransactions(first: SignedTransaction, vararg remaining: SignedTransaction) {
recordTransactions(listOf(first, *remaining))
recordTransactions(true, first, *remaining)
}
/**
* Stores the given [SignedTransaction]s in the local transaction storage and then sends them to the vault for
* further processing. This is expected to be run within a database transaction.
*/
fun recordTransactions(txs: Iterable<SignedTransaction>) {
recordTransactions(true, txs)
}
/**
@ -92,8 +109,7 @@ interface ServiceHub : ServicesForResolution {
val stx = validatedTransactions.getTransaction(stateRef.txhash) ?: throw TransactionResolutionException(stateRef.txhash)
return if (stx.isNotaryChangeTransaction()) {
stx.resolveNotaryChangeTransaction(this).outputs[stateRef.index]
}
else stx.tx.outputs[stateRef.index]
} else stx.tx.outputs[stateRef.index]
}
/**
@ -106,8 +122,7 @@ interface ServiceHub : ServicesForResolution {
val stx = validatedTransactions.getTransaction(stateRef.txhash) ?: throw TransactionResolutionException(stateRef.txhash)
return if (stx.isNotaryChangeTransaction()) {
stx.resolveNotaryChangeTransaction(this).outRef<T>(stateRef.index)
}
else {
} else {
stx.tx.outRef<T>(stateRef.index)
}
}

View File

@ -801,9 +801,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
return flowFactories[initiatingFlowClass]
}
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
database.transaction {
super.recordTransactions(txs)
super.recordTransactions(notifyVault, txs)
}
}
override fun jdbcSession(): Connection = database.createSession()

View File

@ -89,8 +89,8 @@ interface ServiceHubInternal : PluginServiceHub {
val database: CordaPersistence
val configuration: NodeConfiguration
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
require (txs.any()) { "No transactions passed in for recording" }
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
require(txs.any()) { "No transactions passed in for recording" }
val recordedTransactions = txs.filter { validatedTransactions.addTransaction(it) }
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
if (stateMachineRunId != null) {
@ -101,9 +101,11 @@ interface ServiceHubInternal : PluginServiceHub {
log.warn("Transactions recorded from outside of a state machine")
}
if (notifyVault) {
val toNotify = recordedTransactions.map { if (it.isNotaryChangeTransaction()) it.notaryChangeTx else it.tx }
vaultService.notifyAll(toNotify)
}
}
/**
* Starts an already constructed flow. Note that you must be on the server thread to call this method. [FlowInitiator]

View File

@ -28,7 +28,6 @@ import java.lang.Exception
import java.util.*
import javax.persistence.Tuple
class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
val vault: VaultService) : SingletonSerializeAsToken(), VaultQueryService {
companion object {

View File

@ -75,7 +75,7 @@ class HibernateConfigurationTest : TestDependencyInjectionBase() {
services = object : MockServices(BOB_KEY, BOC_KEY, DUMMY_NOTARY_KEY) {
override val vaultService: VaultService = makeVaultService(dataSourceProps, hibernateConfig)
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
validatedTransactions.addTransaction(stx)
}

View File

@ -94,13 +94,14 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val originalVaultQuery = vaultQuery
val services2 = object : MockServices() {
override val vaultService: VaultService get() = originalVault
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
validatedTransactions.addTransaction(stx)
vaultService.notify(stx.tx)
}
}
override val vaultQueryService : VaultQueryService get() = originalVaultQuery
override val vaultQueryService: VaultQueryService get() = originalVaultQuery
}
val w2 = services2.vaultQueryService.queryBy<Cash.State>().states

View File

@ -60,7 +60,7 @@ open class MockServices(vararg val keys: KeyPair) : ServiceHub {
val key: KeyPair get() = keys.first()
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
txs.forEach {
stateMachineRecordedTransactionMapping.addMapping(StateMachineRunId.createRandom(), it.id)
}
@ -229,7 +229,7 @@ fun makeTestDatabaseAndMockServices(customSchemas: Set<MappedSchema> = setOf(Com
object : MockServices(*(keys.toTypedArray())) {
override val vaultService: VaultService = makeVaultService(dataSourceProps, hibernateConfig)
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
validatedTransactions.addTransaction(stx)
}