mirror of
https://github.com/corda/corda.git
synced 2025-02-18 16:40:55 +00:00
Decommissioned InMemoryVaultService service (all dependent Tests updated to use NodeVaultService)
This commit is contained in:
parent
c7d98b8c6b
commit
75f671a446
@ -1,142 +0,0 @@
|
||||
package com.r3corda.core.testing
|
||||
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.bufferUntilSubscribed
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.services.Vault
|
||||
import com.r3corda.core.node.services.VaultService
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.transactions.TransactionBuilder
|
||||
import com.r3corda.core.transactions.WireTransaction
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* This class implements a simple, in memory vault that tracks states that are owned by us, and also has a convenience
|
||||
* method to auto-generate some self-issued cash states that can be used for test trading. A real vault would persist
|
||||
* states relevant to us into a database and once such a vault is implemented, this scaffolding can be removed.
|
||||
*/
|
||||
@ThreadSafe
|
||||
open class InMemoryVaultService(protected val services: ServiceHub) : SingletonSerializeAsToken(), VaultService {
|
||||
open protected val log = loggerFor<InMemoryVaultService>()
|
||||
|
||||
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
|
||||
// inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference
|
||||
// to vault somewhere.
|
||||
protected class InnerState {
|
||||
var vault = Vault(emptyList<StateAndRef<ContractState>>())
|
||||
val _updatesPublisher = PublishSubject.create<Vault.Update>()
|
||||
}
|
||||
|
||||
protected val mutex = ThreadBox(InnerState())
|
||||
|
||||
override val currentVault: Vault get() = mutex.locked { vault }
|
||||
|
||||
override val updates: Observable<Vault.Update>
|
||||
get() = mutex.content._updatesPublisher
|
||||
|
||||
override fun track(): Pair<Vault, Observable<Vault.Update>> {
|
||||
return mutex.locked {
|
||||
Pair(vault, updates.bufferUntilSubscribed())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a snapshot of the heads of LinearStates.
|
||||
*/
|
||||
override val linearHeads: Map<UniqueIdentifier, StateAndRef<LinearState>>
|
||||
get() = currentVault.let { vault ->
|
||||
vault.states.filterStatesOfType<LinearState>().associateBy { it.state.data.linearId }.mapValues { it.value }
|
||||
}
|
||||
|
||||
override fun notifyAll(txns: Iterable<WireTransaction>): Vault {
|
||||
val ourKeys = services.keyManagementService.keys.keys
|
||||
|
||||
// Note how terribly incomplete this all is!
|
||||
//
|
||||
// - We don't notify anyone of anything, there are no event listeners.
|
||||
// - We don't handle or even notice invalidations due to double spends of things in our vault.
|
||||
// - We have no concept of confidence (for txns where there is no definite finality).
|
||||
// - No notification that keys are used, for the case where we observe a spend of our own states.
|
||||
// - No ability to create complex spends.
|
||||
// - No logging or tracking of how the vault got into this state.
|
||||
// - No persistence.
|
||||
// - Does tx relevancy calculation and key management need to be interlocked? Probably yes.
|
||||
//
|
||||
// ... and many other things .... (Wallet.java in bitcoinj is several thousand lines long)
|
||||
|
||||
var netDelta = Vault.NoUpdate
|
||||
val changedVault = mutex.locked {
|
||||
// Starting from the current vault, keep applying the transaction updates, calculating a new vault each
|
||||
// time, until we get to the result (this is perhaps a bit inefficient, but it's functional and easily
|
||||
// unit tested).
|
||||
val vaultAndNetDelta = txns.fold(Pair(currentVault, Vault.NoUpdate)) { vaultAndDelta, tx ->
|
||||
val (vault, delta) = vaultAndDelta.first.update(tx, ourKeys)
|
||||
val combinedDelta = delta + vaultAndDelta.second
|
||||
Pair(vault, combinedDelta)
|
||||
}
|
||||
|
||||
vault = vaultAndNetDelta.first
|
||||
netDelta = vaultAndNetDelta.second
|
||||
return@locked vault
|
||||
}
|
||||
|
||||
if (netDelta != Vault.NoUpdate) {
|
||||
mutex.locked {
|
||||
_updatesPublisher.onNext(netDelta)
|
||||
}
|
||||
}
|
||||
return changedVault
|
||||
}
|
||||
|
||||
override fun generateSpend(tx: TransactionBuilder, amount: Amount<Currency>, to: PublicKey, onlyFromParties: Set<Party>?): Pair<TransactionBuilder, List<PublicKey>> {
|
||||
// TODO: decommission entirely this Vault implementation
|
||||
throw UnsupportedOperationException("Should be using NodeVaultService implementation!")
|
||||
}
|
||||
|
||||
|
||||
private fun isRelevant(state: ContractState, ourKeys: Set<PublicKey>): Boolean {
|
||||
return if (state is OwnableState) {
|
||||
state.owner in ourKeys
|
||||
} else if (state is LinearState) {
|
||||
// It's potentially of interest to the vault
|
||||
state.isRelevant(ourKeys)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
private fun Vault.update(tx: WireTransaction, ourKeys: Set<PublicKey>): Pair<Vault, Vault.Update> {
|
||||
val ourNewStates = tx.outputs.
|
||||
filter { isRelevant(it.data, ourKeys) }.
|
||||
map { tx.outRef<ContractState>(it.data) }
|
||||
|
||||
// Now calculate the states that are being spent by this transaction.
|
||||
val consumed: Set<StateRef> = states.map { it.ref }.intersect(tx.inputs)
|
||||
|
||||
// Is transaction irrelevant?
|
||||
if (consumed.isEmpty() && ourNewStates.isEmpty()) {
|
||||
log.trace { "tx ${tx.id} was irrelevant to this vault, ignoring" }
|
||||
return Pair(this, Vault.NoUpdate)
|
||||
}
|
||||
|
||||
val change = Vault.Update(consumed, HashSet(ourNewStates))
|
||||
|
||||
// And calculate the new vault.
|
||||
val newStates = states.filter { it.ref !in consumed } + ourNewStates
|
||||
|
||||
log.trace {
|
||||
"Applied tx ${tx.id.prefixChars()} to the vault: consumed ${consumed.size} states and added ${newStates.size}"
|
||||
}
|
||||
|
||||
return Pair(Vault(newStates), change)
|
||||
}
|
||||
|
||||
}
|
@ -7,7 +7,6 @@ import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.*
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.ProtocolLogicRefFactory
|
||||
import com.r3corda.core.testing.InMemoryVaultService
|
||||
import com.r3corda.core.transactions.SignedTransaction
|
||||
import com.r3corda.node.serialization.NodeClock
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
@ -17,6 +16,7 @@ import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import com.r3corda.node.services.persistence.DataVending
|
||||
import com.r3corda.node.services.schema.NodeSchemaService
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
import com.r3corda.node.services.vault.NodeVaultService
|
||||
import com.r3corda.testing.MOCK_IDENTITY_SERVICE
|
||||
import com.r3corda.testing.node.MockNetworkMapCache
|
||||
import com.r3corda.testing.node.MockStorageService
|
||||
@ -37,7 +37,7 @@ open class MockServiceHubInternal(
|
||||
val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory(),
|
||||
val schemas: SchemaService? = NodeSchemaService()
|
||||
) : ServiceHubInternal() {
|
||||
override val vaultService: VaultService = customVault ?: InMemoryVaultService(this)
|
||||
override val vaultService: VaultService = customVault ?: NodeVaultService(this)
|
||||
override val keyManagementService: KeyManagementService
|
||||
get() = keyManagement ?: throw UnsupportedOperationException()
|
||||
override val identityService: IdentityService
|
||||
|
@ -6,23 +6,25 @@ import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.days
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.recordTransactions
|
||||
import com.r3corda.core.node.services.VaultService
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.ProtocolLogicRef
|
||||
import com.r3corda.core.protocols.ProtocolLogicRefFactory
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.transactions.SignedTransaction
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY
|
||||
import com.r3corda.node.services.events.NodeSchedulerService
|
||||
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
import com.r3corda.node.services.vault.NodeVaultService
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import com.r3corda.node.utilities.configureDatabase
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import com.r3corda.testing.ALICE_KEY
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork
|
||||
import com.r3corda.testing.node.MockKeyManagementService
|
||||
import com.r3corda.testing.node.TestClock
|
||||
import com.r3corda.testing.node.makeTestDataSourceProperties
|
||||
import com.r3corda.testing.node.*
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@ -50,11 +52,12 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
|
||||
val factory = ProtocolLogicRefFactory(mapOf(Pair(TestProtocolLogic::class.java.name, setOf(NodeSchedulerServiceTest::class.java.name, Integer::class.java.name))))
|
||||
|
||||
val services: MockServiceHubInternal
|
||||
lateinit var services: MockServiceHubInternal
|
||||
|
||||
lateinit var scheduler: NodeSchedulerService
|
||||
lateinit var smmExecutor: AffinityExecutor.ServiceAffinityExecutor
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var countDown: CountDownLatch
|
||||
lateinit var smmHasRemovedAllProtocols: CountDownLatch
|
||||
|
||||
@ -69,14 +72,6 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
val testReference: NodeSchedulerServiceTest
|
||||
}
|
||||
|
||||
init {
|
||||
val kms = MockKeyManagementService(ALICE_KEY)
|
||||
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), persistenceTx = { it() })
|
||||
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
|
||||
override val testReference = this@NodeSchedulerServiceTest
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
countDown = CountDownLatch(1)
|
||||
@ -84,7 +79,28 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
calls = 0
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
val database = dataSourceAndDatabase.second
|
||||
database = dataSourceAndDatabase.second
|
||||
|
||||
// Switched from InMemoryVault usage to NodeVault
|
||||
databaseTransaction(database) {
|
||||
val services1 = object : MockServices() {
|
||||
override val vaultService: VaultService = NodeVaultService(this)
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
storageService.validatedTransactions.addTransaction(stx)
|
||||
vaultService.notify(stx.tx)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
val kms = MockKeyManagementService(ALICE_KEY)
|
||||
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), persistenceTx = { it() })
|
||||
services = object : MockServiceHubInternal(customVault = services1.vaultService, overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
|
||||
override val testReference = this@NodeSchedulerServiceTest
|
||||
}
|
||||
}
|
||||
|
||||
scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor)
|
||||
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
|
||||
val mockSMM = StateMachineManager(services, listOf(services), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor, database)
|
||||
@ -257,20 +273,24 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
}
|
||||
|
||||
private fun scheduleTX(instant: Instant, increment: Int = 1): ScheduledStateRef? {
|
||||
var scheduledRef: ScheduledStateRef? = null
|
||||
apply {
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
val state = TestState(factory.create(TestProtocolLogic::class.java, increment), instant)
|
||||
val usefulTX = TransactionType.General.Builder(null).apply {
|
||||
addOutputState(state, DUMMY_NOTARY)
|
||||
addCommand(Command(), freshKey.public)
|
||||
signWith(freshKey)
|
||||
}.toSignedTransaction()
|
||||
val txHash = usefulTX.id
|
||||
|
||||
services.recordTransactions(usefulTX)
|
||||
scheduledRef = ScheduledStateRef(StateRef(txHash, 0), state.instant)
|
||||
scheduler.scheduleStateActivity(scheduledRef!!)
|
||||
var scheduledRef: ScheduledStateRef? = null
|
||||
|
||||
databaseTransaction(database) {
|
||||
apply {
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
val state = TestState(factory.create(TestProtocolLogic::class.java, increment), instant)
|
||||
val usefulTX = TransactionType.General.Builder(null).apply {
|
||||
addOutputState(state, DUMMY_NOTARY)
|
||||
addCommand(Command(), freshKey.public)
|
||||
signWith(freshKey)
|
||||
}.toSignedTransaction()
|
||||
val txHash = usefulTX.id
|
||||
|
||||
services.recordTransactions(usefulTX)
|
||||
scheduledRef = ScheduledStateRef(StateRef(txHash, 0), state.instant)
|
||||
scheduler.scheduleStateActivity(scheduledRef!!)
|
||||
}
|
||||
}
|
||||
return scheduledRef
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ import com.r3corda.core.node.services.KeyManagementService
|
||||
import com.r3corda.core.node.services.ServiceInfo
|
||||
import com.r3corda.core.node.services.VaultService
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.testing.InMemoryVaultService
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.internal.AbstractNode
|
||||
|
Loading…
x
Reference in New Issue
Block a user