Removed the StorageService and puts its components directly into the service hub

This commit is contained in:
Shams Asari
2017-06-29 11:13:40 +01:00
parent ecc96b29b0
commit a08f701dc5
45 changed files with 240 additions and 316 deletions

View File

@ -45,7 +45,10 @@ import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.RegistrationResponse
import net.corda.node.services.network.NodeRegistration
import net.corda.node.services.network.PersistentNetworkMapService
import net.corda.node.services.persistence.*
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.FlowStateMachineImpl
@ -70,7 +73,6 @@ import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Modifier.*
import java.net.JarURLConnection
import java.net.URI
import java.nio.file.FileAlreadyExistsException
import java.nio.file.Path
import java.nio.file.Paths
import java.security.KeyPair
@ -120,10 +122,14 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private val flowFactories = ConcurrentHashMap<Class<out FlowLogic<*>>, InitiatedFlowFactory<*>>()
protected val partyKeys = mutableSetOf<KeyPair>()
val services = object : ServiceHubInternal() {
val services = object : ServiceHubInternal {
override val attachments: AttachmentStorage get() = this@AbstractNode.attachments
override val uploaders: List<FileUploader> get() = this@AbstractNode.uploaders
override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage
get() = this@AbstractNode.transactionMappings
override val validatedTransactions: TransactionStorage get() = this@AbstractNode.transactions
override val networkService: MessagingService get() = network
override val networkMapCache: NetworkMapCacheInternal get() = netMapCache
override val storageService: TxWritableStorageService get() = storage
override val vaultService: VaultService get() = vault
override val vaultQueryService: VaultQueryService get() = vaultQuery
override val keyManagementService: KeyManagementService get() = keyManagement
@ -157,7 +163,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
database.transaction {
recordTransactionsInternal(storage, txs)
super.recordTransactions(txs)
}
}
}
@ -167,9 +173,12 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
lateinit var info: NodeInfo
lateinit var storage: TxWritableStorageService
lateinit var checkpointStorage: CheckpointStorage
lateinit var smm: StateMachineManager
lateinit var attachments: NodeAttachmentService
lateinit var transactions: TransactionStorage
lateinit var transactionMappings: StateMachineRecordedTransactionMappingStorage
lateinit var uploaders: List<FileUploader>
lateinit var vault: VaultService
lateinit var vaultQuery: VaultQueryService
lateinit var keyManagement: KeyManagementService
@ -469,9 +478,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
*/
private fun makeServices(keyStoreWrapper: KeyStoreWrapper): MutableList<Any> {
val keyStore = keyStoreWrapper.keyStore
val storageServices = initialiseStorageService(configuration.baseDirectory)
storage = storageServices.first
checkpointStorage = storageServices.second
attachments = createAttachmentStorage()
transactions = createTransactionStorage()
transactionMappings = DBTransactionMappingStorage()
checkpointStorage = DBCheckpointStorage()
netMapCache = InMemoryNetworkMapCache(services)
network = makeMessagingService()
schemas = makeSchemaService()
@ -490,11 +500,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
keyManagement = makeKeyManagementService(identity)
scheduler = NodeSchedulerService(services, database, unfinishedSchedules = busyNodeLatch)
val tokenizableServices = mutableListOf(storage, network, vault, vaultQuery, keyManagement, identity, platformClock, scheduler)
val tokenizableServices = mutableListOf(attachments, network, vault, vaultQuery, keyManagement, identity, platformClock, scheduler)
makeAdvertisedServices(tokenizableServices)
return tokenizableServices
}
protected open fun createTransactionStorage(): TransactionStorage = DBTransactionStorage()
private fun scanCordapps(): ScanResult? {
val scanPackage = System.getProperty("net.corda.node.cordapp.scan.package")
val paths = if (scanPackage != null) {
@ -548,9 +560,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
private fun initUploaders() {
val uploaders: List<FileUploader> = listOf(storage.attachments as NodeAttachmentService) +
cordappServices.values.filterIsInstance(AcceptsFileUpload::class.java)
(storage as StorageServiceImpl).initUploaders(uploaders)
uploaders = listOf(attachments) + cordappServices.values.filterIsInstance(AcceptsFileUpload::class.java)
}
private fun makeVaultObservers() {
@ -625,7 +635,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
* Run any tasks that are needed to ensure the node is in a correct state before running start().
*/
open fun setup(): AbstractNode {
createNodeDir()
configuration.baseDirectory.createDirectories()
return this
}
@ -761,22 +771,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
protected abstract fun startMessagingService(rpcOps: RPCOps)
protected open fun initialiseStorageService(dir: Path): Pair<TxWritableStorageService, CheckpointStorage> {
val attachments = makeAttachmentStorage(dir)
val checkpointStorage = DBCheckpointStorage()
val transactionStorage = DBTransactionStorage()
val stateMachineTransactionMappingStorage = DBTransactionMappingStorage()
return Pair(
constructStorageService(attachments, transactionStorage, stateMachineTransactionMappingStorage),
checkpointStorage
)
}
protected open fun constructStorageService(attachments: AttachmentStorage,
transactionStorage: TransactionStorage,
stateMachineRecordedTransactionMappingStorage: StateMachineRecordedTransactionMappingStorage) =
StorageServiceImpl(attachments, transactionStorage, stateMachineRecordedTransactionMappingStorage)
protected fun obtainLegalIdentity(): PartyAndCertificate = identityKeyPair.first
protected fun obtainLegalIdentityKey(): KeyPair = identityKeyPair.second
private val identityKeyPair by lazy { obtainKeyPair("identity", configuration.myLegalName) }
@ -846,18 +840,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
protected open fun generateKeyPair() = cryptoGenerateKeyPair()
protected fun makeAttachmentStorage(dir: Path): AttachmentStorage {
val attachmentsDir = dir / "attachments"
try {
attachmentsDir.createDirectory()
} catch (e: FileAlreadyExistsException) {
}
private fun createAttachmentStorage(): NodeAttachmentService {
val attachmentsDir = (configuration.baseDirectory / "attachments").createDirectories()
return NodeAttachmentService(attachmentsDir, configuration.dataSourceProperties, services.monitoringService.metrics)
}
protected fun createNodeDir() {
configuration.baseDirectory.createDirectories()
}
}
private class KeyStoreWrapper(val keyStore: KeyStore, val storePath: Path, private val storePassword: String) {

View File

@ -12,7 +12,6 @@ import net.corda.core.identity.Party
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
@ -76,7 +75,7 @@ class CordaRPCOpsImpl(
override fun verifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return database.transaction {
services.storageService.validatedTransactions.track()
services.validatedTransactions.track()
}
}
@ -92,7 +91,7 @@ class CordaRPCOpsImpl(
override fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
return database.transaction {
services.storageService.stateMachineRecordedTransactionMapping.track()
services.stateMachineRecordedTransactionMapping.track()
}
}
@ -143,21 +142,21 @@ class CordaRPCOpsImpl(
override fun attachmentExists(id: SecureHash): Boolean {
// TODO: this operation should not require an explicit transaction
return database.transaction {
services.storageService.attachments.openAttachment(id) != null
services.attachments.openAttachment(id) != null
}
}
override fun openAttachment(id: SecureHash): InputStream {
// TODO: this operation should not require an explicit transaction
return database.transaction {
services.storageService.attachments.openAttachment(id)!!.open()
services.attachments.openAttachment(id)!!.open()
}
}
override fun uploadAttachment(jar: InputStream): SecureHash {
// TODO: this operation should not require an explicit transaction
return database.transaction {
services.storageService.attachments.importAttachment(jar)
services.attachments.importAttachment(jar)
}
}
@ -166,7 +165,7 @@ class CordaRPCOpsImpl(
override fun currentNodeTime(): Instant = Instant.now(services.clock)
@Suppress("OverridingDeprecatedMember", "DEPRECATION")
override fun uploadFile(dataType: String, name: String?, file: InputStream): String {
val acceptor = services.storageService.uploaders.firstOrNull { it.accepts(dataType) }
val acceptor = services.uploaders.firstOrNull { it.accepts(dataType) }
return database.transaction {
acceptor?.upload(file) ?: throw RuntimeException("Cannot find file upload acceptor for $dataType")
}

View File

@ -27,14 +27,14 @@ import net.corda.flows.*
*/
class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {
override fun getData(id: SecureHash): SignedTransaction? {
return serviceHub.storageService.validatedTransactions.getTransaction(id)
return serviceHub.validatedTransactions.getTransaction(id)
}
}
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
override fun getData(id: SecureHash): ByteArray? {
return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes()
return serviceHub.attachments.openAttachment(id)?.open()?.readBytes()
}
}

View File

@ -1,16 +1,20 @@
package net.corda.node.services.api
import com.google.common.annotations.VisibleForTesting
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowStateMachine
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.node.NodeInfo
import net.corda.core.node.PluginServiceHub
import net.corda.core.node.services.FileUploader
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.TxWritableStorageService
import net.corda.core.node.services.TransactionStorage
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.loggerFor
@ -58,34 +62,38 @@ sealed class NetworkCacheError : Exception() {
class DeregistrationFailed : NetworkCacheError()
}
abstract class ServiceHubInternal : PluginServiceHub {
interface ServiceHubInternal : PluginServiceHub {
companion object {
private val log = loggerFor<ServiceHubInternal>()
}
abstract val monitoringService: MonitoringService
abstract val schemaService: SchemaService
abstract override val networkMapCache: NetworkMapCacheInternal
abstract val schedulerService: SchedulerService
abstract val auditService: AuditService
abstract val rpcFlows: List<Class<out FlowLogic<*>>>
abstract val networkService: MessagingService
abstract val database: Database
abstract val configuration: NodeConfiguration
/**
* Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then
* sends them to the vault for further processing. This is intended for implementations to call from
* [recordTransactions].
*
* @param txs The transactions to record.
* A map of hash->tx where tx has been signature/contract validated and the states are known to be correct.
* The signatures aren't technically needed after that point, but we keep them around so that we can relay
* the transaction data to other nodes that need it.
*/
internal fun recordTransactionsInternal(writableStorageService: TxWritableStorageService, txs: Iterable<SignedTransaction>) {
override val validatedTransactions: TransactionStorage
val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage
val monitoringService: MonitoringService
val schemaService: SchemaService
override val networkMapCache: NetworkMapCacheInternal
val schedulerService: SchedulerService
val auditService: AuditService
val rpcFlows: List<Class<out FlowLogic<*>>>
val networkService: MessagingService
val database: Database
val configuration: NodeConfiguration
@Suppress("DEPRECATION")
@Deprecated("This service will be removed in a future milestone")
val uploaders: List<FileUploader>
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
val recordedTransactions = txs.filter { writableStorageService.validatedTransactions.addTransaction(it) }
val recordedTransactions = txs.filter { validatedTransactions.addTransaction(it) }
if (stateMachineRunId != null) {
recordedTransactions.forEach {
storageService.stateMachineRecordedTransactionMapping.addMapping(stateMachineRunId, it.id)
stateMachineRecordedTransactionMapping.addMapping(stateMachineRunId, it.id)
}
} else {
log.warn("Transactions recorded from outside of a state machine")
@ -104,7 +112,7 @@ abstract class ServiceHubInternal : PluginServiceHub {
* Starts an already constructed flow. Note that you must be on the server thread to call this method.
* @param flowInitiator indicates who started the flow, see: [FlowInitiator].
*/
abstract fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T>
fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T>
/**
* Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow.
@ -124,5 +132,14 @@ abstract class ServiceHubInternal : PluginServiceHub {
return startFlow(logic, flowInitiator)
}
abstract fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>?
}
fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>?
}
/**
* This is the interface to storage storing state machine -> recorded tx mappings. Any time a transaction is recorded
* during a flow run [addMapping] should be called.
*/
interface StateMachineRecordedTransactionMappingStorage {
fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash)
fun track(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping>
}

View File

@ -5,12 +5,11 @@ import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.DataFeed
import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage
import net.corda.node.utilities.*
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import rx.Observable
import rx.subjects.PublishSubject
import javax.annotation.concurrent.ThreadSafe

View File

@ -5,6 +5,7 @@ import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.SecureHash
import net.corda.core.messaging.DataFeed
import net.corda.core.node.services.TransactionStorage
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.node.utilities.*
import org.jetbrains.exposed.sql.ResultRow
@ -14,7 +15,7 @@ import rx.Observable
import rx.subjects.PublishSubject
import java.util.Collections.synchronizedMap
class DBTransactionStorage : TransactionStorage {
class DBTransactionStorage : TransactionStorage, SingletonSerializeAsToken() {
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}transactions") {
val txId = secureHash("tx_id")
val transaction = blob("transaction")
@ -59,7 +60,7 @@ class DBTransactionStorage : TransactionStorage {
}
}
val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized()
private val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized()
override val updates: Observable<SignedTransaction> = updatesPublisher.wrapWithDatabaseTransaction()
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {

View File

@ -5,9 +5,8 @@ import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.DataFeed
import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage
import net.corda.core.node.services.StateMachineTransactionMapping
import rx.Observable
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage
import rx.subjects.PublishSubject
import java.util.*
import javax.annotation.concurrent.ThreadSafe

View File

@ -14,10 +14,7 @@ import net.corda.core.div
import net.corda.core.extractZipFile
import net.corda.core.isDirectory
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationToken
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SerializeAsTokenContext
import net.corda.core.serialization.*
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.AcceptsFileUpload
import net.corda.node.services.database.RequeryConfiguration
@ -38,8 +35,11 @@ import javax.annotation.concurrent.ThreadSafe
* Stores attachments in H2 database.
*/
@ThreadSafe
class NodeAttachmentService(override var storePath: Path, dataSourceProperties: Properties, metrics: MetricRegistry) : AttachmentStorage, AcceptsFileUpload {
private val log = loggerFor<NodeAttachmentService>()
class NodeAttachmentService(override var storePath: Path, dataSourceProperties: Properties, metrics: MetricRegistry)
: AttachmentStorage, AcceptsFileUpload, SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<NodeAttachmentService>()
}
val configuration = RequeryConfiguration(dataSourceProperties)
val session = configuration.sessionForModel(Models.PERSISTENCE)

View File

@ -1,18 +0,0 @@
package net.corda.node.services.persistence
import net.corda.core.node.services.*
import net.corda.core.serialization.SingletonSerializeAsToken
open class StorageServiceImpl(override val attachments: AttachmentStorage,
override val validatedTransactions: TransactionStorage,
override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage)
: SingletonSerializeAsToken(), TxWritableStorageService {
override val attachmentsClassLoaderEnabled = false
lateinit override var uploaders: List<FileUploader>
fun initUploaders(uploadersList: List<FileUploader>) {
@Suppress("DEPRECATION")
uploaders = uploadersList
}
}

View File

@ -205,7 +205,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction {
logger.debug { "waitForLedgerCommit($hash) ..." }
suspend(WaitForLedgerCommit(hash, sessionFlow.stateMachine as FlowStateMachineImpl<*>))
val stx = serviceHub.storageService.validatedTransactions.getTransaction(hash)
val stx = serviceHub.validatedTransactions.getTransaction(hash)
if (stx != null) {
logger.debug { "Transaction $hash committed to ledger" }
return stx

View File

@ -187,7 +187,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun listenToLedgerTransactions() {
// Observe the stream of committed, validated transactions and resume fibers that are waiting for them.
serviceHub.storageService.validatedTransactions.updates.subscribe { stx ->
serviceHub.validatedTransactions.updates.subscribe { stx ->
val hash = stx.id
val fibers: Set<FlowStateMachineImpl<*>> = mutex.locked { fibersWaitingForLedgerCommit.removeAll(hash) }
if (fibers.isNotEmpty()) {
@ -268,7 +268,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
if (waitingForResponse != null) {
if (waitingForResponse is WaitForLedgerCommit) {
val stx = database.transaction {
serviceHub.storageService.validatedTransactions.getTransaction(waitingForResponse.hash)
serviceHub.validatedTransactions.getTransaction(waitingForResponse.hash)
}
if (stx != null) {
fiber.logger.info("Resuming fiber as tx ${waitingForResponse.hash} has committed")
@ -548,7 +548,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun processWaitForCommitRequest(ioRequest: WaitForLedgerCommit) {
// Is it already committed?
val stx = database.transaction {
serviceHub.storageService.validatedTransactions.getTransaction(ioRequest.hash)
serviceHub.validatedTransactions.getTransaction(ioRequest.hash)
}
if (stx != null) {
resumeFiber(ioRequest.fiber)

View File

@ -72,10 +72,10 @@ public class VaultQueryJavaTests {
@Override
public void recordTransactions(@NotNull Iterable<SignedTransaction> txs) {
for (SignedTransaction stx : txs) {
getStorageService().getValidatedTransactions().addTransaction(stx);
getValidatedTransactions().addTransaction(stx);
}
Stream<WireTransaction> wtxn = StreamSupport.stream(txs.spliterator(), false).map(txn -> txn.getTx());
Stream<WireTransaction> wtxn = StreamSupport.stream(txs.spliterator(), false).map(SignedTransaction::getTx);
getVaultService().notifyAll(wtxn.collect(Collectors.toList()));
}
};

View File

@ -11,11 +11,10 @@ import net.corda.flows.FetchDataFlow
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.database.RequeryConfiguration
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.testing.node.MockNetwork
import net.corda.node.utilities.transaction
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.makeTestDataSourceProperties
import org.jetbrains.exposed.sql.Database
import org.junit.Before
@ -61,7 +60,7 @@ class AttachmentTests {
// Insert an attachment into node zero's store directly.
val id = n0.database.transaction {
n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
n0.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
}
// Get node one to run a flow to fetch it and insert it.
@ -72,7 +71,7 @@ class AttachmentTests {
// Verify it was inserted into node one's store.
val attachment = n1.database.transaction {
n1.storage.attachments.openAttachment(id)!!
n1.attachments.openAttachment(id)!!
}
assertEquals(id, attachment.open().readBytes().sha256())
@ -108,7 +107,7 @@ class AttachmentTests {
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
override fun start(): MockNetwork.MockNode {
super.start()
(storage.attachments as NodeAttachmentService).checkAttachmentsOnLoad = false
attachments.checkAttachmentsOnLoad = false
return this
}
}
@ -119,7 +118,7 @@ class AttachmentTests {
val attachment = fakeAttachment()
// Insert an attachment into node zero's store directly.
val id = n0.database.transaction {
n0.storage.attachments.importAttachment(ByteArrayInputStream(attachment))
n0.attachments.importAttachment(ByteArrayInputStream(attachment))
}
// Corrupt its store.
@ -130,7 +129,7 @@ class AttachmentTests {
corruptAttachment.attId = id
corruptAttachment.content = attachment
n0.database.transaction {
(n0.storage.attachments as NodeAttachmentService).session.update(corruptAttachment)
n0.attachments.session.update(corruptAttachment)
}

View File

@ -9,15 +9,21 @@ import net.corda.core.contracts.*
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sign
import net.corda.core.flows.*
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.*
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.TransactionStorage
import net.corda.core.node.services.Vault
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
@ -28,7 +34,6 @@ import net.corda.flows.TwoPartyTradeFlow.Seller
import net.corda.node.internal.AbstractNode
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.StorageServiceImpl
import net.corda.node.services.persistence.checkpoints
import net.corda.node.utilities.transaction
import net.corda.testing.*
@ -211,7 +216,7 @@ class TwoPartyTradeFlowTests {
assertThat(bobNode.checkpointStorage.checkpoints()).hasSize(1)
}
val storage = bobNode.storage.validatedTransactions
val storage = bobNode.services.validatedTransactions
val bobTransactionsBeforeCrash = bobNode.database.transaction {
(storage as DBTransactionStorage).transactions
}
@ -252,7 +257,9 @@ class TwoPartyTradeFlowTests {
}
bobNode.database.transaction {
val restoredBobTransactions = bobTransactionsBeforeCrash.filter { bobNode.storage.validatedTransactions.getTransaction(it.id) != null }
val restoredBobTransactions = bobTransactionsBeforeCrash.filter {
bobNode.services.validatedTransactions.getTransaction(it.id) != null
}
assertThat(restoredBobTransactions).containsAll(bobTransactionsBeforeCrash)
}
@ -276,13 +283,9 @@ class TwoPartyTradeFlowTests {
overrideServices: Map<ServiceInfo, KeyPair>?,
entropyRoot: BigInteger): MockNetwork.MockNode {
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
// That constructs the storage service object in a customised way ...
override fun constructStorageService(
attachments: AttachmentStorage,
transactionStorage: TransactionStorage,
stateMachineRecordedTransactionMappingStorage: StateMachineRecordedTransactionMappingStorage
): StorageServiceImpl {
return StorageServiceImpl(attachments, RecordingTransactionStorage(database, transactionStorage), stateMachineRecordedTransactionMappingStorage)
// That constructs a recording tx storage
override fun createTransactionStorage(): TransactionStorage {
return RecordingTransactionStorage(database, super.createTransactionStorage())
}
}
}
@ -326,7 +329,7 @@ class TwoPartyTradeFlowTests {
mockNet.runNetwork()
run {
val records = (bobNode.storage.validatedTransactions as RecordingTransactionStorage).records
val records = (bobNode.services.validatedTransactions as RecordingTransactionStorage).records
// Check Bobs's database accesses as Bob's cash transactions are downloaded by Alice.
records.expectEvents(isStrict = false) {
sequence(
@ -344,7 +347,7 @@ class TwoPartyTradeFlowTests {
// Bob has downloaded the attachment.
bobNode.database.transaction {
bobNode.storage.attachments.openAttachment(attachmentID)!!.openAsJAR().use {
bobNode.services.attachments.openAttachment(attachmentID)!!.openAsJAR().use {
it.nextJarEntry
val contents = it.reader().readText()
assertTrue(contents.contains("Our commercial paper is top notch stuff"))
@ -354,7 +357,7 @@ class TwoPartyTradeFlowTests {
// And from Alice's perspective ...
run {
val records = (aliceNode.storage.validatedTransactions as RecordingTransactionStorage).records
val records = (aliceNode.services.validatedTransactions as RecordingTransactionStorage).records
records.expectEvents(isStrict = false) {
sequence(
// Seller Alice sends her seller info to Bob, who wants to check the asset for sale.
@ -422,8 +425,10 @@ class TwoPartyTradeFlowTests {
mockNet.runNetwork() // Clear network map registration messages
val aliceTxStream = aliceNode.storage.validatedTransactions.track().second
val aliceTxMappings = with(aliceNode) { database.transaction { storage.stateMachineRecordedTransactionMapping.track().second } }
val aliceTxStream = aliceNode.services.validatedTransactions.track().updates
val aliceTxMappings = with(aliceNode) {
database.transaction { services.stateMachineRecordedTransactionMapping.track().updates }
}
val aliceSmId = runBuyerAndSeller(notaryNode, aliceNode, bobNode,
"alice's paper".outputStateAndRef()).sellerId
@ -443,7 +448,7 @@ class TwoPartyTradeFlowTests {
)
aliceTxStream.expectEvents { aliceTxExpectations }
val aliceMappingExpectations = sequence(
expect { (stateMachineRunId, transactionId) ->
expect<StateMachineTransactionMapping> { (stateMachineRunId, transactionId) ->
require(stateMachineRunId == aliceSmId)
require(transactionId == bobsFakeCash[0].id)
},
@ -451,9 +456,9 @@ class TwoPartyTradeFlowTests {
require(stateMachineRunId == aliceSmId)
require(transactionId == bobsFakeCash[2].id)
},
expect { mapping: StateMachineTransactionMapping ->
require(mapping.stateMachineRunId == aliceSmId)
require(mapping.transactionId == bobsFakeCash[1].id)
expect { (stateMachineRunId, transactionId) ->
require(stateMachineRunId == aliceSmId)
require(transactionId == bobsFakeCash[1].id)
}
)
aliceTxMappings.expectEvents { aliceMappingExpectations }
@ -589,7 +594,7 @@ class TwoPartyTradeFlowTests {
}
return node.database.transaction {
node.services.recordTransactions(signed)
val validatedTransactions = node.services.storageService.validatedTransactions
val validatedTransactions = node.services.validatedTransactions
if (validatedTransactions is RecordingTransactionStorage) {
validatedTransactions.records.clear()
}

View File

@ -6,7 +6,6 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.*
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.serialization.NodeClock
import net.corda.node.services.api.*
@ -17,9 +16,11 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.testing.MOCK_IDENTITY_SERVICE
import net.corda.testing.node.MockAttachmentStorage
import net.corda.testing.node.MockNetworkMapCache
import net.corda.testing.node.MockStorageService
import org.jetbrains.exposed.sql.Database
import net.corda.testing.node.MockStateMachineRecordedTransactionMappingStorage
import net.corda.testing.node.MockTransactionStorage
import java.time.Clock
open class MockServiceHubInternal(
@ -28,13 +29,16 @@ open class MockServiceHubInternal(
val keyManagement: KeyManagementService? = null,
val network: MessagingService? = null,
val identity: IdentityService? = MOCK_IDENTITY_SERVICE,
val storage: TxWritableStorageService? = MockStorageService(),
override val attachments: AttachmentStorage = MockAttachmentStorage(),
override val validatedTransactions: TransactionStorage = MockTransactionStorage(),
override val uploaders: List<FileUploader> = listOf<FileUploader>(),
override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage = MockStateMachineRecordedTransactionMappingStorage(),
val mapCache: NetworkMapCacheInternal? = null,
val scheduler: SchedulerService? = null,
val overrideClock: Clock? = NodeClock(),
val schemas: SchemaService? = NodeSchemaService(),
val customTransactionVerifierService: TransactionVerifierService? = InMemoryTransactionVerifierService(2)
) : ServiceHubInternal() {
) : ServiceHubInternal {
override val vaultQueryService: VaultQueryService
get() = customVaultQuery ?: throw UnsupportedOperationException()
override val transactionVerifierService: TransactionVerifierService
@ -49,8 +53,6 @@ open class MockServiceHubInternal(
get() = network ?: throw UnsupportedOperationException()
override val networkMapCache: NetworkMapCacheInternal
get() = mapCache ?: MockNetworkMapCache(this)
override val storageService: StorageService
get() = storage ?: throw UnsupportedOperationException()
override val schedulerService: SchedulerService
get() = scheduler ?: throw UnsupportedOperationException()
override val clock: Clock
@ -67,14 +69,9 @@ open class MockServiceHubInternal(
override val schemaService: SchemaService
get() = schemas ?: throw UnsupportedOperationException()
override val auditService: AuditService = DummyAuditService()
// We isolate the storage service with writable TXes so that it can't be accessed except via recordTransactions()
private val txStorageService: TxWritableStorageService
get() = storage ?: throw UnsupportedOperationException()
lateinit var smm: StateMachineManager
override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs)
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T = throw UnsupportedOperationException()
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T> {

View File

@ -99,7 +99,7 @@ class NotaryChangeTests {
val newState = future.resultFuture.getOrThrow()
assertEquals(newState.state.notary, newNotary)
val notaryChangeTx = clientNodeA.services.storageService.validatedTransactions.getTransaction(newState.ref.txhash)!!.tx
val notaryChangeTx = clientNodeA.services.validatedTransactions.getTransaction(newState.ref.txhash)!!.tx
// Check that all encumbrances have been propagated to the outputs
val originalOutputs = issueTx.outputs.map { it.data }

View File

@ -85,7 +85,7 @@ class HibernateConfigurationTest {
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx)
validatedTransactions.addTransaction(stx)
}
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })

View File

@ -6,7 +6,6 @@ import net.corda.contracts.testing.fillWithSomeTestCash
import net.corda.core.contracts.*
import net.corda.core.identity.AnonymousParty
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.node.services.TxWritableStorageService
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.unconsumedStates
import net.corda.core.serialization.OpaqueBytes
@ -53,7 +52,7 @@ class NodeVaultServiceTest {
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx)
validatedTransactions.addTransaction(stx)
}
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
@ -77,17 +76,12 @@ class NodeVaultServiceTest {
val w1 = vaultSvc.unconsumedStates<Cash.State>()
assertThat(w1).hasSize(3)
val originalStorage = services.storageService
val originalVault = vaultSvc
val services2 = object : MockServices() {
override val vaultService: VaultService get() = originalVault
// We need to be able to find the same transactions as before, too.
override val storageService: TxWritableStorageService get() = originalStorage
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx)
validatedTransactions.addTransaction(stx)
vaultService.notify(stx.tx)
}
}

View File

@ -75,7 +75,7 @@ class VaultQueryTests {
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx)
validatedTransactions.addTransaction(stx)
}
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
@ -93,9 +93,9 @@ class VaultQueryTests {
/**
* Helper method for generating a Persistent H2 test database
*/
@Ignore //@Test
@Ignore
@Test
fun createPersistentTestDb() {
val dataSourceAndDatabase = configureDatabase(makePersistentDataSourceProperties())
val dataSource = dataSourceAndDatabase.first
val database = dataSourceAndDatabase.second

View File

@ -3,7 +3,9 @@ package net.corda.node.services.vault
import net.corda.contracts.DummyDealContract
import net.corda.contracts.asset.Cash
import net.corda.contracts.asset.DUMMY_CASH_ISSUER
import net.corda.contracts.testing.*
import net.corda.contracts.testing.fillWithSomeTestCash
import net.corda.contracts.testing.fillWithSomeTestDeals
import net.corda.contracts.testing.fillWithSomeTestLinearStates
import net.corda.core.contracts.*
import net.corda.core.identity.AnonymousParty
import net.corda.core.node.services.VaultService
@ -54,7 +56,7 @@ class VaultWithCashTest {
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx)
validatedTransactions.addTransaction(stx)
}
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })