mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
Clean up of ServiceHubInternal, including how it's created in AbstractNode
This commit is contained in:
@ -5,8 +5,8 @@ import net.corda.core.contracts.DummyContract
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionType
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.map
|
||||
import net.corda.core.utilities.DUMMY_BANK_A
|
||||
import net.corda.flows.NotaryError
|
||||
@ -26,28 +26,28 @@ class RaftNotaryServiceTests : NodeBasedTest() {
|
||||
|
||||
@Test
|
||||
fun `detect double spend`() {
|
||||
val (masterNode, alice) = Futures.allAsList(
|
||||
startNotaryCluster(notaryName, 3).map { it.first() },
|
||||
startNode(DUMMY_BANK_A.name)
|
||||
val (bankA) = Futures.allAsList(
|
||||
startNode(DUMMY_BANK_A.name),
|
||||
startNotaryCluster(notaryName, 3).map { it.first() }
|
||||
).getOrThrow()
|
||||
|
||||
val notaryParty = alice.netMapCache.getNotary(notaryName)!!
|
||||
val notaryParty = bankA.services.networkMapCache.getNotary(notaryName)!!
|
||||
|
||||
val inputState = issueState(alice, notaryParty)
|
||||
val inputState = issueState(bankA, notaryParty)
|
||||
|
||||
val firstTxBuilder = TransactionType.General.Builder(notaryParty).withItems(inputState)
|
||||
val firstSpendTx = alice.services.signInitialTransaction(firstTxBuilder)
|
||||
val firstSpendTx = bankA.services.signInitialTransaction(firstTxBuilder)
|
||||
|
||||
val firstSpend = alice.services.startFlow(NotaryFlow.Client(firstSpendTx))
|
||||
val firstSpend = bankA.services.startFlow(NotaryFlow.Client(firstSpendTx))
|
||||
firstSpend.resultFuture.getOrThrow()
|
||||
|
||||
val secondSpendBuilder = TransactionType.General.Builder(notaryParty).withItems(inputState).run {
|
||||
val dummyState = DummyContract.SingleOwnerState(0, alice.info.legalIdentity)
|
||||
val dummyState = DummyContract.SingleOwnerState(0, bankA.info.legalIdentity)
|
||||
addOutputState(dummyState)
|
||||
this
|
||||
}
|
||||
val secondSpendTx = alice.services.signInitialTransaction(secondSpendBuilder)
|
||||
val secondSpend = alice.services.startFlow(NotaryFlow.Client(secondSpendTx))
|
||||
val secondSpendTx = bankA.services.signInitialTransaction(secondSpendBuilder)
|
||||
val secondSpend = bankA.services.startFlow(NotaryFlow.Client(secondSpendTx))
|
||||
|
||||
val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() }
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
@ -58,7 +58,7 @@ class RaftNotaryServiceTests : NodeBasedTest() {
|
||||
return node.database.transaction {
|
||||
val builder = DummyContract.generateInitial(Random().nextInt(), notary, node.info.legalIdentity.ref(0))
|
||||
val stx = node.services.signInitialTransaction(builder)
|
||||
node.services.recordTransactions(listOf(stx))
|
||||
node.services.recordTransactions(stx)
|
||||
StateAndRef(builder.outputStates().first(), StateRef(stx.id, 0))
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.*
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||
import net.corda.core.node.services.NotaryService
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
@ -42,6 +41,7 @@ import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.sendRequest
|
||||
import net.corda.node.services.network.InMemoryNetworkMapCache
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
|
||||
import net.corda.node.services.network.NetworkMapService.RegistrationResponse
|
||||
import net.corda.node.services.network.NodeRegistration
|
||||
import net.corda.node.services.network.PersistentNetworkMapService
|
||||
@ -122,78 +122,18 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
private val flowFactories = ConcurrentHashMap<Class<out FlowLogic<*>>, InitiatedFlowFactory<*>>()
|
||||
protected val partyKeys = mutableSetOf<KeyPair>()
|
||||
|
||||
val services = object : ServiceHubInternal {
|
||||
override val attachments: AttachmentStorage get() = this@AbstractNode.attachments
|
||||
override val uploaders: List<FileUploader> get() = this@AbstractNode.uploaders
|
||||
override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage
|
||||
get() = this@AbstractNode.transactionMappings
|
||||
override val validatedTransactions: TransactionStorage get() = this@AbstractNode.transactions
|
||||
override val networkService: MessagingService get() = network
|
||||
override val networkMapCache: NetworkMapCacheInternal get() = netMapCache
|
||||
override val vaultService: VaultService get() = vault
|
||||
override val vaultQueryService: VaultQueryService get() = vaultQuery
|
||||
override val keyManagementService: KeyManagementService get() = keyManagement
|
||||
override val identityService: IdentityService get() = identity
|
||||
override val schedulerService: SchedulerService get() = scheduler
|
||||
override val clock: Clock get() = platformClock
|
||||
override val myInfo: NodeInfo get() = info
|
||||
override val schemaService: SchemaService get() = schemas
|
||||
override val transactionVerifierService: TransactionVerifierService get() = txVerifierService
|
||||
override val auditService: AuditService get() = this@AbstractNode.auditService
|
||||
override val database: Database get() = this@AbstractNode.database
|
||||
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
|
||||
|
||||
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
|
||||
require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" }
|
||||
return cordappServices.getInstance(type) ?: throw IllegalArgumentException("Corda service ${type.name} does not exist")
|
||||
}
|
||||
|
||||
override val rpcFlows: List<Class<out FlowLogic<*>>> get() = this@AbstractNode.rpcFlows
|
||||
|
||||
// Internal only
|
||||
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
|
||||
|
||||
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T> {
|
||||
return serverThread.fetchFrom { smm.add(logic, flowInitiator) }
|
||||
}
|
||||
|
||||
override fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>? {
|
||||
return flowFactories[initiatingFlowClass]
|
||||
}
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
database.transaction {
|
||||
super.recordTransactions(txs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
open fun findMyLocation(): WorldMapLocation? {
|
||||
return configuration.myLegalName.locationOrNull?.let { CityDatabase[it] }
|
||||
}
|
||||
val services: ServiceHubInternal get() = _services
|
||||
|
||||
private lateinit var _services: ServiceHubInternalImpl
|
||||
lateinit var info: NodeInfo
|
||||
lateinit var checkpointStorage: CheckpointStorage
|
||||
lateinit var smm: StateMachineManager
|
||||
lateinit var attachments: NodeAttachmentService
|
||||
lateinit var transactions: TransactionStorage
|
||||
lateinit var transactionMappings: StateMachineRecordedTransactionMappingStorage
|
||||
lateinit var uploaders: List<FileUploader>
|
||||
lateinit var vault: VaultService
|
||||
lateinit var vaultQuery: VaultQueryService
|
||||
lateinit var keyManagement: KeyManagementService
|
||||
var inNodeNetworkMapService: NetworkMapService? = null
|
||||
lateinit var txVerifierService: TransactionVerifierService
|
||||
lateinit var identity: IdentityService
|
||||
lateinit var network: MessagingService
|
||||
lateinit var netMapCache: NetworkMapCacheInternal
|
||||
lateinit var scheduler: NodeSchedulerService
|
||||
lateinit var schemas: SchemaService
|
||||
lateinit var auditService: AuditService
|
||||
protected val runOnStop = ArrayList<() -> Any?>()
|
||||
lateinit var database: Database
|
||||
protected var dbCloser: (() -> Any?)? = null
|
||||
private lateinit var rpcFlows: List<Class<out FlowLogic<*>>>
|
||||
|
||||
var isPreviousCheckpointsPresent = false
|
||||
private set
|
||||
@ -215,6 +155,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
/** The implementation of the [CordaRPCOps] interface used by this node. */
|
||||
open val rpcOps: CordaRPCOps by lazy { CordaRPCOpsImpl(services, smm, database) } // Lazy to avoid init ordering issue with the SMM.
|
||||
|
||||
open fun findMyLocation(): WorldMapLocation? {
|
||||
return configuration.myLegalName.locationOrNull?.let { CityDatabase[it] }
|
||||
}
|
||||
|
||||
open fun start(): AbstractNode {
|
||||
require(!started) { "Node has already been started" }
|
||||
|
||||
@ -233,8 +177,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
|
||||
// Do all of this in a database transaction so anything that might need a connection has one.
|
||||
initialiseDatabasePersistence {
|
||||
val keyStoreWrapper = KeyStoreWrapper(configuration.trustStoreFile, configuration.trustStorePassword)
|
||||
val tokenizableServices = makeServices(keyStoreWrapper)
|
||||
val tokenizableServices = makeServices()
|
||||
|
||||
smm = StateMachineManager(services,
|
||||
checkpointStorage,
|
||||
@ -266,9 +209,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
if (scanResult != null) {
|
||||
installCordaServices(scanResult)
|
||||
registerInitiatedFlows(scanResult)
|
||||
rpcFlows = findRPCFlows(scanResult)
|
||||
} else {
|
||||
rpcFlows = emptyList()
|
||||
findRPCFlows(scanResult)
|
||||
}
|
||||
|
||||
// TODO: Investigate having class path scanning find this flow
|
||||
@ -283,7 +224,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
smm.start()
|
||||
// Shut down the SMM so no Fibers are scheduled.
|
||||
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
|
||||
scheduler.start()
|
||||
_services.schedulerService.start()
|
||||
}
|
||||
started = true
|
||||
return this
|
||||
@ -301,7 +242,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
return scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class)
|
||||
scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class)
|
||||
.filter {
|
||||
val serviceType = getServiceType(it)
|
||||
if (serviceType != null && info.serviceIdentities(serviceType).isEmpty()) {
|
||||
@ -434,19 +375,21 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
return observable
|
||||
}
|
||||
|
||||
private fun findRPCFlows(scanResult: ScanResult): List<Class<out FlowLogic<*>>> {
|
||||
private fun findRPCFlows(scanResult: ScanResult) {
|
||||
fun Class<out FlowLogic<*>>.isUserInvokable(): Boolean {
|
||||
return isPublic(modifiers) && !isLocalClass && !isAnonymousClass && (!isMemberClass || isStatic(modifiers))
|
||||
}
|
||||
|
||||
return scanResult.getClassesWithAnnotation(FlowLogic::class, StartableByRPC::class).filter { it.isUserInvokable() } +
|
||||
// Add any core flows here
|
||||
listOf(
|
||||
ContractUpgradeFlow::class.java,
|
||||
// TODO Remove all Cash flows from default list once they are split into separate CorDapp.
|
||||
CashIssueFlow::class.java,
|
||||
CashExitFlow::class.java,
|
||||
CashPaymentFlow::class.java)
|
||||
_services.rpcFlows += scanResult
|
||||
.getClassesWithAnnotation(FlowLogic::class, StartableByRPC::class)
|
||||
.filter { it.isUserInvokable() } +
|
||||
// Add any core flows here
|
||||
listOf(
|
||||
ContractUpgradeFlow::class.java,
|
||||
// TODO Remove all Cash flows from default list once they are split into separate CorDapp.
|
||||
CashIssueFlow::class.java,
|
||||
CashExitFlow::class.java,
|
||||
CashPaymentFlow::class.java)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -476,36 +419,20 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
* Builds node internal, advertised, and plugin services.
|
||||
* Returns a list of tokenizable services to be added to the serialisation context.
|
||||
*/
|
||||
private fun makeServices(keyStoreWrapper: KeyStoreWrapper): MutableList<Any> {
|
||||
val keyStore = keyStoreWrapper.keyStore
|
||||
attachments = createAttachmentStorage()
|
||||
transactions = createTransactionStorage()
|
||||
transactionMappings = DBTransactionMappingStorage()
|
||||
private fun makeServices(): MutableList<Any> {
|
||||
checkpointStorage = DBCheckpointStorage()
|
||||
netMapCache = InMemoryNetworkMapCache(services)
|
||||
_services = ServiceHubInternalImpl()
|
||||
attachments = createAttachmentStorage()
|
||||
network = makeMessagingService()
|
||||
schemas = makeSchemaService()
|
||||
vault = makeVaultService(configuration.dataSourceProperties)
|
||||
vaultQuery = makeVaultQueryService(schemas)
|
||||
txVerifierService = makeTransactionVerifierService()
|
||||
auditService = DummyAuditService()
|
||||
|
||||
info = makeInfo()
|
||||
identity = makeIdentityService(keyStore.getCertificate(X509Utilities.CORDA_ROOT_CA)!! as X509Certificate,
|
||||
keyStoreWrapper.certificateAndKeyPair(X509Utilities.CORDA_CLIENT_CA),
|
||||
info.legalIdentityAndCert)
|
||||
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
|
||||
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
|
||||
// the identity key. But the infrastructure to make that easy isn't here yet.
|
||||
keyManagement = makeKeyManagementService(identity)
|
||||
scheduler = NodeSchedulerService(services, database, unfinishedSchedules = busyNodeLatch)
|
||||
|
||||
val tokenizableServices = mutableListOf(attachments, network, vault, vaultQuery, keyManagement, identity, platformClock, scheduler)
|
||||
val tokenizableServices = mutableListOf(attachments, network, services.vaultService, services.vaultQueryService,
|
||||
services.keyManagementService, services.identityService, platformClock, services.schedulerService)
|
||||
makeAdvertisedServices(tokenizableServices)
|
||||
return tokenizableServices
|
||||
}
|
||||
|
||||
protected open fun createTransactionStorage(): TransactionStorage = DBTransactionStorage()
|
||||
protected open fun makeTransactionStorage(): WritableTransactionStorage = DBTransactionStorage()
|
||||
|
||||
private fun scanCordapps(): ScanResult? {
|
||||
val scanPackage = System.getProperty("net.corda.node.cordapp.scan.package")
|
||||
@ -560,14 +487,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
private fun initUploaders() {
|
||||
uploaders = listOf(attachments) + cordappServices.values.filterIsInstance(AcceptsFileUpload::class.java)
|
||||
_services.uploaders += attachments
|
||||
cordappServices.values.filterIsInstanceTo(_services.uploaders, AcceptsFileUpload::class.java)
|
||||
}
|
||||
|
||||
private fun makeVaultObservers() {
|
||||
VaultSoftLockManager(vault, smm)
|
||||
VaultSoftLockManager(services.vaultService, smm)
|
||||
CashBalanceAsMetricsObserver(services, database)
|
||||
ScheduledActivityObserver(services)
|
||||
HibernateObserver(vault.rawUpdates, HibernateConfiguration(schemas))
|
||||
HibernateObserver(services.vaultService.rawUpdates, HibernateConfiguration(services.schemaService))
|
||||
}
|
||||
|
||||
private fun makeInfo(): NodeInfo {
|
||||
@ -684,7 +612,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
|
||||
val reg = NodeRegistration(info, instant.toEpochMilli(), ADD, expires)
|
||||
val legalIdentityKey = obtainLegalIdentityKey()
|
||||
val request = NetworkMapService.RegistrationRequest(reg.toWire(keyManagement, legalIdentityKey.public), network.myAddress)
|
||||
val request = RegistrationRequest(reg.toWire(services.keyManagementService, legalIdentityKey.public), network.myAddress)
|
||||
return network.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapAddress)
|
||||
}
|
||||
|
||||
@ -735,7 +663,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
.toTypedArray()
|
||||
val service = InMemoryIdentityService(setOf(info.legalIdentityAndCert), trustRoot = trustRoot, caCertificates = *caCertificates)
|
||||
services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.legalIdentityAndCert) }
|
||||
netMapCache.changed.subscribe { mapChange ->
|
||||
services.networkMapCache.changed.subscribe { mapChange ->
|
||||
// TODO how should we handle network map removal
|
||||
if (mapChange is MapChange.Added) {
|
||||
service.registerIdentity(mapChange.node.legalIdentityAndCert)
|
||||
@ -744,13 +672,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
return service
|
||||
}
|
||||
|
||||
// TODO: sort out ordering of open & protected modifiers of functions in this class.
|
||||
protected open fun makeVaultService(dataSourceProperties: Properties): VaultService = NodeVaultService(services, dataSourceProperties)
|
||||
|
||||
protected open fun makeVaultQueryService(schemas: SchemaService): VaultQueryService = HibernateVaultQueryImpl(HibernateConfiguration(schemas), vault.updatesPublisher)
|
||||
|
||||
protected open fun makeSchemaService(): SchemaService = NodeSchemaService(pluginRegistries.flatMap { it.requiredSchemas }.toSet())
|
||||
|
||||
protected abstract fun makeTransactionVerifierService(): TransactionVerifierService
|
||||
|
||||
open fun stop() {
|
||||
@ -844,6 +765,60 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
val attachmentsDir = (configuration.baseDirectory / "attachments").createDirectories()
|
||||
return NodeAttachmentService(attachmentsDir, configuration.dataSourceProperties, services.monitoringService.metrics)
|
||||
}
|
||||
|
||||
private inner class ServiceHubInternalImpl : ServiceHubInternal, SingletonSerializeAsToken() {
|
||||
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
|
||||
override val uploaders = ArrayList<FileUploader>()
|
||||
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage()
|
||||
override val auditService = DummyAuditService()
|
||||
override val monitoringService = MonitoringService(MetricRegistry())
|
||||
override val validatedTransactions = makeTransactionStorage()
|
||||
override val transactionVerifierService by lazy { makeTransactionVerifierService() }
|
||||
override val networkMapCache by lazy { InMemoryNetworkMapCache(this) }
|
||||
override val vaultService by lazy { NodeVaultService(this, configuration.dataSourceProperties) }
|
||||
override val vaultQueryService by lazy {
|
||||
HibernateVaultQueryImpl(HibernateConfiguration(schemaService), vaultService.updatesPublisher)
|
||||
}
|
||||
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
|
||||
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
|
||||
// the identity key. But the infrastructure to make that easy isn't here yet.
|
||||
override val keyManagementService by lazy { makeKeyManagementService(identityService) }
|
||||
override val schedulerService by lazy { NodeSchedulerService(this, unfinishedSchedules = busyNodeLatch) }
|
||||
override val identityService by lazy {
|
||||
val keyStoreWrapper = KeyStoreWrapper(configuration.trustStoreFile, configuration.trustStorePassword)
|
||||
makeIdentityService(
|
||||
keyStoreWrapper.keyStore.getCertificate(X509Utilities.CORDA_ROOT_CA)!! as X509Certificate,
|
||||
keyStoreWrapper.certificateAndKeyPair(X509Utilities.CORDA_CLIENT_CA),
|
||||
info.legalIdentityAndCert)
|
||||
}
|
||||
override val attachments: AttachmentStorage get() = this@AbstractNode.attachments
|
||||
override val networkService: MessagingService get() = network
|
||||
override val clock: Clock get() = platformClock
|
||||
override val myInfo: NodeInfo get() = info
|
||||
override val schemaService by lazy { NodeSchemaService(pluginRegistries.flatMap { it.requiredSchemas }.toSet()) }
|
||||
override val database: Database get() = this@AbstractNode.database
|
||||
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
|
||||
|
||||
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
|
||||
require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" }
|
||||
return cordappServices.getInstance(type) ?: throw IllegalArgumentException("Corda service ${type.name} does not exist")
|
||||
}
|
||||
|
||||
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T> {
|
||||
return serverThread.fetchFrom { smm.add(logic, flowInitiator) }
|
||||
}
|
||||
|
||||
override fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>? {
|
||||
return flowFactories[initiatingFlowClass]
|
||||
}
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
database.transaction {
|
||||
super.recordTransactions(txs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class KeyStoreWrapper(val keyStore: KeyStore, val storePath: Path, private val storePassword: String) {
|
||||
|
@ -72,7 +72,7 @@ interface ServiceHubInternal : PluginServiceHub {
|
||||
* The signatures aren't technically needed after that point, but we keep them around so that we can relay
|
||||
* the transaction data to other nodes that need it.
|
||||
*/
|
||||
override val validatedTransactions: TransactionStorage
|
||||
override val validatedTransactions: WritableTransactionStorage
|
||||
val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage
|
||||
val monitoringService: MonitoringService
|
||||
val schemaService: SchemaService
|
||||
@ -89,8 +89,9 @@ interface ServiceHubInternal : PluginServiceHub {
|
||||
val uploaders: List<FileUploader>
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
|
||||
val recordedTransactions = txs.filter { validatedTransactions.addTransaction(it) }
|
||||
require(recordedTransactions.isNotEmpty()) { "No transactions passed in for recording" }
|
||||
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
|
||||
if (stateMachineRunId != null) {
|
||||
recordedTransactions.forEach {
|
||||
stateMachineRecordedTransactionMapping.addMapping(stateMachineRunId, it.id)
|
||||
@ -135,6 +136,20 @@ interface ServiceHubInternal : PluginServiceHub {
|
||||
fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>?
|
||||
}
|
||||
|
||||
/**
|
||||
* Thread-safe storage of transactions.
|
||||
*/
|
||||
interface WritableTransactionStorage : TransactionStorage {
|
||||
/**
|
||||
* Add a new transaction to the store. If the store already has a transaction with the same id it will be
|
||||
* overwritten.
|
||||
* @param transaction The transaction to be recorded.
|
||||
* @return true if the transaction was recorded successfully, false if it was already recorded.
|
||||
*/
|
||||
// TODO: Throw an exception if trying to add a transaction with fewer signatures than an existing entry.
|
||||
fun addTransaction(transaction: SignedTransaction): Boolean
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the interface to storage storing state machine -> recorded tx mappings. Any time a transaction is recorded
|
||||
* during a flow run [addMapping] should be called.
|
||||
|
@ -17,7 +17,6 @@ import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
|
||||
import net.corda.node.utilities.*
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import java.time.Instant
|
||||
@ -43,7 +42,6 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeSchedulerService(private val services: ServiceHubInternal,
|
||||
private val database: Database,
|
||||
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(),
|
||||
private val unfinishedSchedules: ReusableLatch = ReusableLatch())
|
||||
: SchedulerService, SingletonSerializeAsToken() {
|
||||
@ -159,7 +157,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
||||
}
|
||||
|
||||
private fun onTimeReached(scheduledState: ScheduledStateRef) {
|
||||
database.transaction {
|
||||
services.database.transaction {
|
||||
val scheduledFlow = getScheduledFlow(scheduledState)
|
||||
if (scheduledFlow != null) {
|
||||
// TODO Because the flow is executed asynchronously, there is a small window between this tx we're in
|
||||
|
@ -4,9 +4,9 @@ import com.google.common.annotations.VisibleForTesting
|
||||
import net.corda.core.bufferUntilSubscribed
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.services.TransactionStorage
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.utilities.*
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.exposedLogger
|
||||
@ -15,7 +15,7 @@ import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.util.Collections.synchronizedMap
|
||||
|
||||
class DBTransactionStorage : TransactionStorage, SingletonSerializeAsToken() {
|
||||
class DBTransactionStorage : WritableTransactionStorage, SingletonSerializeAsToken() {
|
||||
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}transactions") {
|
||||
val txId = secureHash("tx_id")
|
||||
val transaction = blob("transaction")
|
||||
|
@ -8,10 +8,7 @@ import com.google.common.hash.HashingInputStream
|
||||
import com.google.common.io.CountingInputStream
|
||||
import net.corda.core.contracts.AbstractAttachment
|
||||
import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.createDirectory
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.div
|
||||
import net.corda.core.extractZipFile
|
||||
import net.corda.core.isDirectory
|
||||
import net.corda.core.node.services.AttachmentStorage
|
||||
import net.corda.core.serialization.*
|
||||
@ -35,7 +32,7 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* Stores attachments in H2 database.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeAttachmentService(override var storePath: Path, dataSourceProperties: Properties, metrics: MetricRegistry)
|
||||
class NodeAttachmentService(val storePath: Path, dataSourceProperties: Properties, metrics: MetricRegistry)
|
||||
: AttachmentStorage, AcceptsFileUpload, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = loggerFor<NodeAttachmentService>()
|
||||
@ -48,7 +45,6 @@ class NodeAttachmentService(override var storePath: Path, dataSourceProperties:
|
||||
var checkAttachmentsOnLoad = true
|
||||
|
||||
private val attachmentCount = metrics.counter("Attachments")
|
||||
@Volatile override var automaticallyExtractAttachments = false
|
||||
|
||||
init {
|
||||
require(storePath.isDirectory()) { "$storePath must be a directory" }
|
||||
@ -183,19 +179,6 @@ class NodeAttachmentService(override var storePath: Path, dataSourceProperties:
|
||||
|
||||
log.info("Stored new attachment $id")
|
||||
|
||||
if (automaticallyExtractAttachments) {
|
||||
val extractTo = storePath / "$id.jar"
|
||||
try {
|
||||
extractTo.createDirectory()
|
||||
extractZipFile(ByteArrayInputStream(bytes), extractTo)
|
||||
} catch(e: FileAlreadyExistsException) {
|
||||
log.trace("Did not extract attachment jar to directory because it already exists")
|
||||
} catch(e: Exception) {
|
||||
log.error("Failed to extract attachment jar $id, ", e)
|
||||
// TODO: Delete the extractTo directory here.
|
||||
}
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,6 @@ import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.messaging.StateMachineTransactionMapping
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.node.services.TransactionStorage
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
@ -32,6 +31,7 @@ import net.corda.core.utilities.*
|
||||
import net.corda.flows.TwoPartyTradeFlow.Buyer
|
||||
import net.corda.flows.TwoPartyTradeFlow.Seller
|
||||
import net.corda.node.internal.AbstractNode
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.persistence.checkpoints
|
||||
@ -153,7 +153,7 @@ class TwoPartyTradeFlowTests {
|
||||
val cashLockId = UUID.randomUUID()
|
||||
bobNode.database.transaction {
|
||||
// lock the cash states with an arbitrary lockId (to prevent the Buyer flow from claiming the states)
|
||||
bobNode.vault.softLockReserve(cashLockId, cashStates.states.map { it.ref }.toSet())
|
||||
bobNode.services.vaultService.softLockReserve(cashLockId, cashStates.states.map { it.ref }.toSet())
|
||||
}
|
||||
|
||||
val (bobStateMachine, aliceResult) = runBuyerAndSeller(notaryNode, aliceNode, bobNode,
|
||||
@ -284,8 +284,8 @@ class TwoPartyTradeFlowTests {
|
||||
entropyRoot: BigInteger): MockNetwork.MockNode {
|
||||
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
|
||||
// That constructs a recording tx storage
|
||||
override fun createTransactionStorage(): TransactionStorage {
|
||||
return RecordingTransactionStorage(database, super.createTransactionStorage())
|
||||
override fun makeTransactionStorage(): WritableTransactionStorage {
|
||||
return RecordingTransactionStorage(database, super.makeTransactionStorage())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -311,7 +311,7 @@ class TwoPartyTradeFlowTests {
|
||||
attachment(ByteArrayInputStream(stream.toByteArray()))
|
||||
}
|
||||
|
||||
val extraKey = bobNode.keyManagement.keys.single()
|
||||
val extraKey = bobNode.services.keyManagementService.keys.single()
|
||||
val bobsFakeCash = fillUpForBuyer(false, AnonymousParty(extraKey),
|
||||
DUMMY_CASH_ISSUER.party,
|
||||
notaryNode.info.notaryIdentity).second
|
||||
@ -410,7 +410,7 @@ class TwoPartyTradeFlowTests {
|
||||
attachment(ByteArrayInputStream(stream.toByteArray()))
|
||||
}
|
||||
|
||||
val bobsKey = bobNode.keyManagement.keys.single()
|
||||
val bobsKey = bobNode.services.keyManagementService.keys.single()
|
||||
val bobsFakeCash = fillUpForBuyer(false, AnonymousParty(bobsKey),
|
||||
DUMMY_CASH_ISSUER.party,
|
||||
notaryNode.info.notaryIdentity).second
|
||||
@ -675,7 +675,7 @@ class TwoPartyTradeFlowTests {
|
||||
}
|
||||
|
||||
|
||||
class RecordingTransactionStorage(val database: Database, val delegate: TransactionStorage) : TransactionStorage {
|
||||
class RecordingTransactionStorage(val database: Database, val delegate: WritableTransactionStorage) : WritableTransactionStorage {
|
||||
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
|
||||
return database.transaction {
|
||||
delegate.track()
|
||||
|
@ -18,19 +18,20 @@ import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
import net.corda.testing.MOCK_IDENTITY_SERVICE
|
||||
import net.corda.testing.node.MockAttachmentStorage
|
||||
import net.corda.testing.node.MockNetworkMapCache
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import net.corda.testing.node.MockStateMachineRecordedTransactionMappingStorage
|
||||
import net.corda.testing.node.MockTransactionStorage
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import java.time.Clock
|
||||
|
||||
open class MockServiceHubInternal(
|
||||
override val database: Database,
|
||||
val customVault: VaultService? = null,
|
||||
val customVaultQuery: VaultQueryService? = null,
|
||||
val keyManagement: KeyManagementService? = null,
|
||||
val network: MessagingService? = null,
|
||||
val identity: IdentityService? = MOCK_IDENTITY_SERVICE,
|
||||
override val attachments: AttachmentStorage = MockAttachmentStorage(),
|
||||
override val validatedTransactions: TransactionStorage = MockTransactionStorage(),
|
||||
override val validatedTransactions: WritableTransactionStorage = MockTransactionStorage(),
|
||||
override val uploaders: List<FileUploader> = listOf<FileUploader>(),
|
||||
override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage = MockStateMachineRecordedTransactionMappingStorage(),
|
||||
val mapCache: NetworkMapCacheInternal? = null,
|
||||
@ -59,8 +60,6 @@ open class MockServiceHubInternal(
|
||||
get() = overrideClock ?: throw UnsupportedOperationException()
|
||||
override val myInfo: NodeInfo
|
||||
get() = throw UnsupportedOperationException()
|
||||
override val database: Database
|
||||
get() = throw UnsupportedOperationException()
|
||||
override val configuration: NodeConfiguration
|
||||
get() = throw UnsupportedOperationException()
|
||||
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
|
||||
|
@ -135,7 +135,7 @@ class NotaryChangeTests {
|
||||
addOutputState(stateB, notary, encumbrance = 1) // Encumbered by stateC
|
||||
}
|
||||
val stx = node.services.signInitialTransaction(tx)
|
||||
node.services.recordTransactions(listOf(stx))
|
||||
node.services.recordTransactions(stx)
|
||||
return tx.toWireTransaction()
|
||||
}
|
||||
|
||||
@ -152,7 +152,7 @@ fun issueState(node: AbstractNode, notaryNode: AbstractNode): StateAndRef<*> {
|
||||
val tx = DummyContract.generateInitial(Random().nextInt(), notaryNode.info.notaryIdentity, node.info.legalIdentity.ref(0))
|
||||
val signedByNode = node.services.signInitialTransaction(tx)
|
||||
val stx = notaryNode.services.addSignature(signedByNode, notaryNode.services.notaryIdentityKey)
|
||||
node.services.recordTransactions(listOf(stx))
|
||||
node.services.recordTransactions(stx)
|
||||
return StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0))
|
||||
}
|
||||
|
||||
@ -163,8 +163,8 @@ fun issueMultiPartyState(nodeA: AbstractNode, nodeB: AbstractNode, notaryNode: A
|
||||
val signedByA = nodeA.services.signInitialTransaction(tx)
|
||||
val signedByAB = nodeB.services.addSignature(signedByA)
|
||||
val stx = notaryNode.services.addSignature(signedByAB, notaryNode.services.notaryIdentityKey)
|
||||
nodeA.services.recordTransactions(listOf(stx))
|
||||
nodeB.services.recordTransactions(listOf(stx))
|
||||
nodeA.services.recordTransactions(stx)
|
||||
nodeB.services.recordTransactions(stx)
|
||||
val stateAndRef = StateAndRef(state, StateRef(stx.id, 0))
|
||||
return stateAndRef
|
||||
}
|
||||
@ -173,6 +173,6 @@ fun issueInvalidState(node: AbstractNode, notary: Party): StateAndRef<*> {
|
||||
val tx = DummyContract.generateInitial(Random().nextInt(), notary, node.info.legalIdentity.ref(0))
|
||||
tx.addTimeWindow(Instant.now(), 30.seconds)
|
||||
val stx = node.services.signInitialTransaction(tx)
|
||||
node.services.recordTransactions(listOf(stx))
|
||||
node.services.recordTransactions(stx)
|
||||
return StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0))
|
||||
}
|
||||
|
@ -87,11 +87,11 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
InMemoryMessagingNetwork.PeerHandle(0, nullIdentity),
|
||||
AffinityExecutor.ServiceAffinityExecutor("test", 1),
|
||||
database)
|
||||
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, network = mockMessagingService), TestReference {
|
||||
services = object : MockServiceHubInternal(database, overrideClock = testClock, keyManagement = kms, network = mockMessagingService), TestReference {
|
||||
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
|
||||
override val testReference = this@NodeSchedulerServiceTest
|
||||
}
|
||||
scheduler = NodeSchedulerService(services, database, schedulerGatedExecutor)
|
||||
scheduler = NodeSchedulerService(services, schedulerGatedExecutor)
|
||||
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
|
||||
val mockSMM = StateMachineManager(services, DBCheckpointStorage(), smmExecutor, database)
|
||||
mockSMM.changes.subscribe { change ->
|
||||
|
@ -38,13 +38,13 @@ class InMemoryNetworkMapCacheTest {
|
||||
mockNet.runNetwork()
|
||||
|
||||
// Node A currently knows only about itself, so this returns node A
|
||||
assertEquals(nodeA.netMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeA.info)
|
||||
assertEquals(nodeA.services.networkMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeA.info)
|
||||
|
||||
nodeA.database.transaction {
|
||||
nodeA.netMapCache.addNode(nodeB.info)
|
||||
nodeA.services.networkMapCache.addNode(nodeB.info)
|
||||
}
|
||||
// The details of node B write over those for node A
|
||||
assertEquals(nodeA.netMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeB.info)
|
||||
assertEquals(nodeA.services.networkMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeB.info)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -131,7 +131,7 @@ class NotaryServiceTests {
|
||||
val tx = DummyContract.generateInitial(Random().nextInt(), notaryNode.info.notaryIdentity, node.info.legalIdentity.ref(0))
|
||||
val signedByNode = node.services.signInitialTransaction(tx)
|
||||
val stx = notaryNode.services.addSignature(signedByNode, notaryNode.services.notaryIdentityKey)
|
||||
node.services.recordTransactions(listOf(stx))
|
||||
node.services.recordTransactions(stx)
|
||||
return StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0))
|
||||
}
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ class ValidatingNotaryServiceTests {
|
||||
val tx = DummyContract.generateInitial(Random().nextInt(), notaryNode.info.notaryIdentity, node.info.legalIdentity.ref(0))
|
||||
val signedByNode = node.services.signInitialTransaction(tx)
|
||||
val stx = notaryNode.services.addSignature(signedByNode, notaryNode.services.notaryIdentityKey)
|
||||
node.services.recordTransactions(listOf(stx))
|
||||
node.services.recordTransactions(stx)
|
||||
return StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0))
|
||||
}
|
||||
}
|
||||
|
@ -405,7 +405,7 @@ class NodeVaultServiceTest {
|
||||
}
|
||||
val usefulTX = megaCorpServices.signInitialTransaction(usefulBuilder)
|
||||
|
||||
services.recordTransactions(listOf(usefulTX))
|
||||
services.recordTransactions(usefulTX)
|
||||
|
||||
vaultSvc.addNoteToTransaction(usefulTX.id, "USD Sample Note 1")
|
||||
vaultSvc.addNoteToTransaction(usefulTX.id, "USD Sample Note 2")
|
||||
@ -418,7 +418,7 @@ class NodeVaultServiceTest {
|
||||
}
|
||||
val anotherTX = megaCorpServices.signInitialTransaction(anotherBuilder)
|
||||
|
||||
services.recordTransactions(listOf(anotherTX))
|
||||
services.recordTransactions(anotherTX)
|
||||
|
||||
vaultSvc.addNoteToTransaction(anotherTX.id, "GPB Sample Note 1")
|
||||
assertEquals(1, vaultSvc.getTransactionNotes(anotherTX.id).count())
|
||||
|
Reference in New Issue
Block a user