Observer Nodes Cherry pick cleanup

This commit is contained in:
Katelyn Baker 2017-11-06 14:30:54 +00:00
parent 21f3fe2985
commit 160540680c
5 changed files with 12 additions and 9 deletions

View File

@ -24,6 +24,7 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.*
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.schemas.MappedSchema

View File

@ -10,6 +10,7 @@ import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.tee
import net.corda.core.messaging.DataFeed
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultQueryException
@ -104,13 +105,16 @@ class NodeVaultService(private val services: ServiceHub, private val hibernateCo
override val updates: Observable<Vault.Update<ContractState>>
get() = mutex.locked { _updatesInDbTx }
/** Same as notifyAll but with a single transaction. */
fun notify(statesToRecord: StatesToRecord, tx: CoreTransaction) = notifyAll(statesToRecord, listOf(tx))
/**
* Splits the provided [txns] into batches of [WireTransaction] and [NotaryChangeWireTransaction].
* This is required because the batches get aggregated into single updates, and we want to be able to
* indicate whether an update consists entirely of regular or notary change transactions, which may require
* different processing logic.
*/
override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>) {
fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>) {
if (statesToRecord == StatesToRecord.NONE)
return
@ -141,9 +145,6 @@ class NodeVaultService(private val services: ServiceHub, private val hibernateCo
if (notaryChangeTxns.isNotEmpty()) notifyNotaryChange(notaryChangeTxns.toList(), statesToRecord)
}
/** Same as notifyAll but with a single transaction. */
fun notify(tx: CoreTransaction) = notifyAll(listOf(tx))
private fun notifyRegular(txns: Iterable<WireTransaction>, statesToRecord: StatesToRecord) {
fun makeUpdate(tx: WireTransaction): Vault.Update<ContractState> {
val myKeys = services.keyManagementService.filterMyKeys(tx.outputs.flatMap { it.data.participants.map { it.owningKey } })

View File

@ -15,6 +15,7 @@ import net.corda.finance.schemas.CashSchemaV1
import net.corda.finance.schemas.SampleCashSchemaV2
import net.corda.finance.schemas.SampleCashSchemaV3
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSchemaV1

View File

@ -85,11 +85,11 @@ class HibernateConfigurationTest : TestDependencyInjectionBase() {
services = object : MockServices(BOB_KEY, BOC_KEY, DUMMY_NOTARY_KEY) {
override val vaultService: VaultService = makeVaultService(database.hibernateConfig)
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
validatedTransactions.addTransaction(stx)
}
(vaultService as NodeVautService).notifyAll(statesToRecord, txs.map { it.tx })
(vaultService as NodeVaultService).notifyAll(statesToRecord, txs.map { it.tx })
}
override fun jdbcSession() = database.createSession()
}

View File

@ -6,7 +6,6 @@ import net.corda.core.crypto.generateKeyPair
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.internal.packageName
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.*
import net.corda.core.node.services.vault.PageSpecification
@ -29,6 +28,7 @@ import net.corda.node.utilities.CordaPersistence
import net.corda.testing.*
import net.corda.testing.contracts.fillWithSomeTestCash
import net.corda.testing.node.MockServices
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseAndMockServices
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
@ -590,7 +590,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
txb.addCommand(Cash.Commands.Move(), MEGA_CORP_PUBKEY)
val wtx = txb.toWireTransaction(services)
database.transaction {
vaultService.notify(StatesToRecord.ONLY_RELEVANT, wtx)
(vaultService as NodeVaultService).notify(StatesToRecord.ONLY_RELEVANT, wtx)
}
// Check that it was ignored as irrelevant.
@ -598,7 +598,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
// Now try again and check it was accepted.
database.transaction {
vaultService.notify(StatesToRecord.ALL_VISIBLE, wtx)
(vaultService as NodeVaultService).notify(StatesToRecord.ALL_VISIBLE, wtx)
}
assertEquals(currentCashStates + 1, countCash())
}