diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index a595a7204a..82b39fa383 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -42,6 +42,10 @@ import net.corda.node.services.events.ScheduledActivityObserver import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.messaging.MessagingService +import net.corda.node.services.network.NetworkMapCacheImpl +import net.corda.node.services.network.NodeInfoWatcher +import net.corda.node.services.network.PersistentNetworkMapCache +import net.corda.node.services.persistence.* import net.corda.node.services.network.* import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBTransactionMappingStorage @@ -130,7 +134,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected lateinit var attachments: NodeAttachmentService protected lateinit var network: MessagingService protected val runOnStop = ArrayList<() -> Any?>() - protected lateinit var database: CordaPersistence protected val _nodeReadyFuture = openFuture() protected val networkMapClient: NetworkMapClient? by lazy { configuration.compatibilityZoneURL?.let(::NetworkMapClient) } @@ -153,7 +156,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, @Volatile private var _started: StartedNode? = null /** The implementation of the [CordaRPCOps] interface used by this node. */ - open fun makeRPCOps(flowStarter: FlowStarter): CordaRPCOps { + open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence): CordaRPCOps { return SecureCordaRPCOps(services, smm, database, flowStarter) } @@ -185,16 +188,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration, readNetworkParameters() val schemaService = NodeSchemaService(cordappLoader) // Do all of this in a database transaction so anything that might need a connection has one. - val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService) { - val transactionStorage = makeTransactionStorage() + val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService) { database -> + val transactionStorage = makeTransactionStorage(database) val stateLoader = StateLoaderImpl(transactionStorage) - val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, stateLoader) - val notaryService = makeNotaryService(nodeServices) - smm = makeStateMachineManager() + val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, stateLoader, database) + val notaryService = makeNotaryService(nodeServices, database) + smm = makeStateMachineManager(database) val flowStarter = FlowStarterImpl(serverThread, smm) val schedulerService = NodeSchedulerService( platformClock, - this@AbstractNode.database, + database, flowStarter, stateLoader, unfinishedSchedules = busyNodeLatch, @@ -207,8 +210,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS) } } - makeVaultObservers(schedulerService) - val rpcOps = makeRPCOps(flowStarter) + makeVaultObservers(schedulerService, database.hibernateConfig) + val rpcOps = makeRPCOps(flowStarter, database) startMessagingService(rpcOps) installCoreFlows() val cordaServices = installCordaServices(flowStarter) @@ -272,8 +275,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } protected abstract fun myAddresses(): List - - protected open fun makeStateMachineManager(): StateMachineManager { + protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager { return StateMachineManagerImpl( services, checkpointStorage, @@ -493,7 +495,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, * Builds node internal, advertised, and plugin services. * Returns a list of tokenizable services to be added to the serialisation context. */ - private fun makeServices(keyPairs: Set, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, stateLoader: StateLoader): MutableList { + private fun makeServices(keyPairs: Set, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, stateLoader: StateLoader, database: CordaPersistence): MutableList { checkpointStorage = DBCheckpointStorage() val metrics = MetricRegistry() attachments = NodeAttachmentService(metrics) @@ -507,8 +509,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration, transactionStorage, stateLoader, MonitoringService(metrics), - cordappProvider) - network = makeMessagingService() + cordappProvider, + database) + network = makeMessagingService(database) val tokenizableServices = mutableListOf(attachments, network, services.vaultService, services.keyManagementService, services.identityService, platformClock, services.auditService, services.monitoringService, services.networkMapCache, services.schemaService, @@ -517,12 +520,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration, return tokenizableServices } - protected open fun makeTransactionStorage(): WritableTransactionStorage = DBTransactionStorage() - - private fun makeVaultObservers(schedulerService: SchedulerService) { + protected open fun makeTransactionStorage(database: CordaPersistence): WritableTransactionStorage = DBTransactionStorage() + private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration) { VaultSoftLockManager.install(services.vaultService, smm) ScheduledActivityObserver.install(services.vaultService, schedulerService) - HibernateObserver.install(services.vaultService.rawUpdates, database.hibernateConfig) + HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig) } @VisibleForTesting @@ -551,26 +553,26 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // Specific class so that MockNode can catch it. class DatabaseConfigurationException(msg: String) : CordaException(msg) - protected open fun initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: () -> T): T { + protected open fun initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: (CordaPersistence) -> T): T { val props = configuration.dataSourceProperties if (props.isNotEmpty()) { - this.database = configureDatabase(props, configuration.database, { _services.identityService }, schemaService) + val database = configureDatabase(props, configuration.database, { _services.identityService }, schemaService) // Now log the vendor string as this will also cause a connection to be tested eagerly. database.transaction { log.info("Connected to ${database.dataSource.connection.metaData.databaseProductName} database.") } runOnStop += database::close return database.transaction { - insideTransaction() + insideTransaction(database) } } else { throw DatabaseConfigurationException("There must be a database configured.") } } - private fun makeNotaryService(tokenizableServices: MutableList): NotaryService? { + private fun makeNotaryService(tokenizableServices: MutableList, database: CordaPersistence): NotaryService? { return configuration.notary?.let { - makeCoreNotaryService(it).also { + makeCoreNotaryService(it, database).also { tokenizableServices.add(it) runOnStop += it::stop installCoreFlow(NotaryFlow.Client::class, it::createServiceFlow) @@ -598,24 +600,17 @@ abstract class AbstractNode(val configuration: NodeConfiguration, check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) { "Node is too old for the network" } } - private fun makeCoreNotaryService(notaryConfig: NotaryConfig): NotaryService { + private fun makeCoreNotaryService(notaryConfig: NotaryConfig, database: CordaPersistence): NotaryService { val notaryKey = myNotaryIdentity?.owningKey ?: throw IllegalArgumentException("No notary identity initialized when creating a notary service") - return if (notaryConfig.validating) { - if (notaryConfig.raft != null) { - RaftValidatingNotaryService(services, notaryKey, notaryConfig.raft) - } else if (notaryConfig.bftSMaRt != null) { - throw IllegalArgumentException("Validating BFTSMaRt notary not supported") + return notaryConfig.run { + if (raft != null) { + val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.monitoringService.metrics, raft) + (if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider) + } else if (bftSMaRt != null) { + if (validating) throw IllegalArgumentException("Validating BFTSMaRt notary not supported") + BFTNonValidatingNotaryService(services, notaryKey, bftSMaRt, makeBFTCluster(notaryKey, bftSMaRt)) } else { - ValidatingNotaryService(services, notaryKey) - } - } else { - if (notaryConfig.raft != null) { - RaftNonValidatingNotaryService(services, notaryKey, notaryConfig.raft) - } else if (notaryConfig.bftSMaRt != null) { - val cluster = makeBFTCluster(notaryKey, notaryConfig.bftSMaRt) - BFTNonValidatingNotaryService(services, notaryKey, notaryConfig.bftSMaRt, cluster) - } else { - SimpleNotaryService(services, notaryKey) + (if (validating) ::ValidatingNotaryService else ::SimpleNotaryService)(services, notaryKey) } } } @@ -657,8 +652,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, _started = null } - protected abstract fun makeMessagingService(): MessagingService - + protected abstract fun makeMessagingService(database: CordaPersistence): MessagingService protected abstract fun startMessagingService(rpcOps: RPCOps) private fun obtainIdentity(notaryConfig: NotaryConfig?): Pair { @@ -716,8 +710,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } protected open fun generateKeyPair() = cryptoGenerateKeyPair() - protected open fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader): VaultServiceInternal { - return NodeVaultService(platformClock, keyManagementService, stateLoader, database.hibernateConfig) + protected open fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader, hibernateConfig: HibernateConfiguration): VaultServiceInternal { + return NodeVaultService(platformClock, keyManagementService, stateLoader, hibernateConfig) } private inner class ServiceHubInternalImpl( @@ -730,7 +724,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, override val validatedTransactions: WritableTransactionStorage, private val stateLoader: StateLoader, override val monitoringService: MonitoringService, - override val cordappProvider: CordappProviderInternal + override val cordappProvider: CordappProviderInternal, + override val database: CordaPersistence ) : SingletonSerializeAsToken(), ServiceHubInternal, StateLoader by stateLoader { override val rpcFlows = ArrayList>>() override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage() @@ -739,18 +734,17 @@ abstract class AbstractNode(val configuration: NodeConfiguration, override val networkMapCache by lazy { NetworkMapCacheImpl( PersistentNetworkMapCache( - this@AbstractNode.database, + database, networkParameters.notaries), identityService) } - override val vaultService by lazy { makeVaultService(keyManagementService, stateLoader) } + override val vaultService by lazy { makeVaultService(keyManagementService, stateLoader, database.hibernateConfig) } override val contractUpgradeService by lazy { ContractUpgradeServiceImpl() } override val attachments: AttachmentStorage get() = this@AbstractNode.attachments override val networkService: MessagingService get() = network override val clock: Clock get() = platformClock override val myInfo: NodeInfo get() = info override val myNodeStateObservable: Observable get() = nodeStateObservable - override val database: CordaPersistence get() = this@AbstractNode.database override val configuration: NodeConfiguration get() = this@AbstractNode.configuration override fun cordaService(type: Class): T { require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 349ef97121..837c7ad133 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -25,6 +25,7 @@ import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.utilities.AddressUtils import net.corda.node.utilities.AffinityExecutor +import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.DemoClock import net.corda.nodeapi.internal.ShutdownHook import net.corda.nodeapi.internal.addShutdownHook @@ -128,8 +129,7 @@ open class Node(configuration: NodeConfiguration, private var shutdownHook: ShutdownHook? = null private lateinit var userService: RPCUserService - - override fun makeMessagingService(): MessagingService { + override fun makeMessagingService(database: CordaPersistence): MessagingService { userService = RPCUserServiceImpl(configuration.rpcUsers) val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker() @@ -214,7 +214,7 @@ open class Node(configuration: NodeConfiguration, * This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details * on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url */ - override fun initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: () -> T): T { + override fun initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: (CordaPersistence) -> T): T { val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url") val h2Prefix = "jdbc:h2:file:" if (databaseUrl != null && databaseUrl.startsWith(h2Prefix)) { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index c6aeebe62d..4ae2639b38 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -77,8 +77,8 @@ class NodeMessagingClient(override val config: NodeConfiguration, private val serverAddress: NetworkHostAndPort, private val myIdentity: PublicKey, private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, - val database: CordaPersistence, - val monitoringService: MonitoringService, + private val database: CordaPersistence, + private val monitoringService: MonitoringService, advertisedAddress: NetworkHostAndPort = serverAddress ) : ArtemisMessagingComponent(), MessagingService { companion object { diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt index 797f9aa7de..1433e71f85 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt @@ -2,23 +2,20 @@ package net.corda.node.services.transactions import net.corda.core.flows.FlowSession import net.corda.core.flows.NotaryFlow +import net.corda.core.node.ServiceHub import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.TrustedAuthorityNotaryService -import net.corda.node.services.api.ServiceHubInternal -import net.corda.node.services.config.RaftConfig import java.security.PublicKey /** A non-validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */ -class RaftNonValidatingNotaryService(override val services: ServiceHubInternal, +class RaftNonValidatingNotaryService(override val services: ServiceHub, override val notaryIdentityKey: PublicKey, - raftConfig: RaftConfig) : TrustedAuthorityNotaryService() { + override val uniquenessProvider: RaftUniquenessProvider) : TrustedAuthorityNotaryService() { companion object { val id = constructId(validating = false, raft = true) } override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock) - override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services, raftConfig) - override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service { return NonValidatingNotaryFlow(otherPartySession, this) } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt index 130f954a50..7c4897826a 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt @@ -1,6 +1,7 @@ package net.corda.node.services.transactions import com.codahale.metrics.Gauge +import com.codahale.metrics.MetricRegistry import io.atomix.catalyst.buffer.BufferInput import io.atomix.catalyst.buffer.BufferOutput import io.atomix.catalyst.serializer.Serializer @@ -25,10 +26,10 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.loggerFor -import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.RaftConfig import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.node.utilities.CordaPersistence +import net.corda.nodeapi.config.NodeSSLConfiguration import net.corda.nodeapi.config.SSLConfiguration import java.nio.file.Path import java.util.concurrent.CompletableFuture @@ -44,7 +45,7 @@ import javax.persistence.* * to the cluster leader to be actioned. */ @ThreadSafe -class RaftUniquenessProvider(private val services: ServiceHubInternal, private val raftConfig: RaftConfig) : UniquenessProvider, SingletonSerializeAsToken() { +class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfiguration, private val db: CordaPersistence, private val metrics: MetricRegistry, private val raftConfig: RaftConfig) : UniquenessProvider, SingletonSerializeAsToken() { companion object { private val log = loggerFor() @@ -77,13 +78,7 @@ class RaftUniquenessProvider(private val services: ServiceHubInternal, private v ) /** Directory storing the Raft log and state machine snapshots */ - private val storagePath: Path = services.configuration.baseDirectory - /** Address of the Copycat node run by this Corda node */ - /** The database to store the state machine state in */ - private val db: CordaPersistence = services.database - /** SSL configuration */ - private val transportConfiguration: SSLConfiguration = services.configuration - + private val storagePath: Path = transportConfiguration.baseDirectory private lateinit var _clientFuture: CompletableFuture private lateinit var server: CopycatServer @@ -177,15 +172,13 @@ class RaftUniquenessProvider(private val services: ServiceHubInternal, private v } private fun registerMonitoring() { - services.monitoringService.metrics.register("RaftCluster.ThisServerStatus", Gauge { + metrics.register("RaftCluster.ThisServerStatus", Gauge { server.state().name }) - - services.monitoringService.metrics.register("RaftCluster.MembersCount", Gauge { + metrics.register("RaftCluster.MembersCount", Gauge { server.cluster().members().size }) - - services.monitoringService.metrics.register("RaftCluster.Members", Gauge> { + metrics.register("RaftCluster.Members", Gauge> { server.cluster().members().map { it.address().toString() } }) } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt index 4af9e2be74..8fd3448512 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt @@ -2,23 +2,20 @@ package net.corda.node.services.transactions import net.corda.core.flows.FlowSession import net.corda.core.flows.NotaryFlow +import net.corda.core.node.ServiceHub import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.TrustedAuthorityNotaryService -import net.corda.node.services.api.ServiceHubInternal -import net.corda.node.services.config.RaftConfig import java.security.PublicKey /** A validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */ -class RaftValidatingNotaryService(override val services: ServiceHubInternal, +class RaftValidatingNotaryService(override val services: ServiceHub, override val notaryIdentityKey: PublicKey, - raftConfig: RaftConfig) : TrustedAuthorityNotaryService() { + override val uniquenessProvider: RaftUniquenessProvider) : TrustedAuthorityNotaryService() { companion object { val id = constructId(validating = true, raft = true) } override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock) - override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services, raftConfig) - override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service { return ValidatingNotaryFlow(otherPartySession, this) } diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index 27181e599b..cf418a7fea 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -296,8 +296,8 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { return mockNet.createNode(MockNodeParameters(legalName = name), nodeFactory = { args -> object : MockNetwork.MockNode(args) { // That constructs a recording tx storage - override fun makeTransactionStorage(): WritableTransactionStorage { - return RecordingTransactionStorage(database, super.makeTransactionStorage()) + override fun makeTransactionStorage(database: CordaPersistence): WritableTransactionStorage { + return RecordingTransactionStorage(database, super.makeTransactionStorage(database)) } } }) diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt index 16aa63254d..a6b1acfa38 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt @@ -25,6 +25,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.services.api.VaultServiceInternal +import net.corda.node.services.persistence.HibernateConfiguration import net.corda.testing.chooseIdentity import net.corda.testing.node.MockNetwork import net.corda.testing.rigorousMock @@ -81,8 +82,8 @@ class VaultSoftLockManagerTest { } private val mockNet = MockNetwork(cordappPackages = listOf(ContractImpl::class.packageName), defaultFactory = { args -> object : MockNetwork.MockNode(args) { - override fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader): VaultServiceInternal { - val realVault = super.makeVaultService(keyManagementService, stateLoader) + override fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader, hibernateConfig: HibernateConfiguration): VaultServiceInternal { + val realVault = super.makeVaultService(keyManagementService, stateLoader, hibernateConfig) return object : VaultServiceInternal by realVault { override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet?) { mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests. diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt index 65bdd15987..2f59e327d7 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -35,6 +35,7 @@ import net.corda.node.services.transactions.BFTSMaRt import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor +import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.ServiceIdentityGenerator import net.corda.testing.DUMMY_NOTARY import net.corda.testing.common.internal.NetworkParametersCopier @@ -256,7 +257,7 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete // We only need to override the messaging service here, as currently everything that hits disk does so // through the java.nio API which we are already mocking via Jimfs. - override fun makeMessagingService(): MessagingService { + override fun makeMessagingService(database: CordaPersistence): MessagingService { require(id >= 0) { "Node ID must be zero or positive, was passed: " + id } return mockNet.messagingNetwork.createNodeWithID( !mockNet.threadPerNode, @@ -302,9 +303,9 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete override val serializationWhitelists: List get() = testSerializationWhitelists private var dbCloser: (() -> Any?)? = null - override fun initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: () -> T) = super.initialiseDatabasePersistence(schemaService) { + override fun initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: (CordaPersistence) -> T) = super.initialiseDatabasePersistence(schemaService) { database -> dbCloser = database::close - insideTransaction() + insideTransaction(database) } fun disableDBCloseOnStop() {