INFRA-670 Stop shutting down the database before some services that use it (#6690)

* INFRA-670 Stop shutting down the database before some services that use it.

* Test fixes
This commit is contained in:
Rick Parker 2020-09-08 08:34:19 +01:00 committed by GitHub
parent 45fe3f3aef
commit fd341c7e48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 13 deletions

View File

@ -167,6 +167,8 @@ import net.corda.nodeapi.internal.persistence.OutstandingDatabaseChangesExceptio
import net.corda.nodeapi.internal.persistence.RestrictedConnection import net.corda.nodeapi.internal.persistence.RestrictedConnection
import net.corda.nodeapi.internal.persistence.RestrictedEntityManager import net.corda.nodeapi.internal.persistence.RestrictedEntityManager
import net.corda.nodeapi.internal.persistence.SchemaMigration import net.corda.nodeapi.internal.persistence.SchemaMigration
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.withoutDatabaseAccess
import net.corda.tools.shell.InteractiveShell import net.corda.tools.shell.InteractiveShell
import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.activemq.artemis.utils.ReusableLatch
import org.jolokia.jvmagent.JolokiaServer import org.jolokia.jvmagent.JolokiaServer
@ -252,7 +254,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
private val notaryLoader = configuration.notary?.let { private val notaryLoader = configuration.notary?.let {
NotaryLoader(it, versionInfo) NotaryLoader(it, versionInfo)
} }
val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo).closeOnStop() val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo).closeOnStop(false)
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize() val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize()
val identityService = PersistentIdentityService(cacheFactory).tokenize() val identityService = PersistentIdentityService(cacheFactory).tokenize()
val database: CordaPersistence = createCordaPersistence( val database: CordaPersistence = createCordaPersistence(
@ -387,8 +389,13 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
return this return this
} }
protected fun <T : AutoCloseable> T.closeOnStop(): T { protected fun <T : AutoCloseable> T.closeOnStop(usesDatabase: Boolean = true): T {
runOnStop += this::close if (usesDatabase) {
contextDatabase // Will throw if no database is available, since this would run after closing the database, yet claims it needs it.
runOnStop += this::close
} else {
runOnStop += { withoutDatabaseAccess { this.close() } }
}
return this return this
} }
@ -533,7 +540,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
installCoreFlows() installCoreFlows()
registerCordappFlows() registerCordappFlows()
services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows } services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
val rpcOps = makeRPCOps(cordappLoader, checkpointDumper)
startShell() startShell()
networkMapClient?.start(trustRoot) networkMapClient?.start(trustRoot)
@ -546,6 +552,11 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
networkMapCache.start(netParams.notaries) networkMapCache.start(netParams.notaries)
startDatabase() startDatabase()
// The following services need to be closed before the database, so need to be registered after it is started.
networkMapUpdater.closeOnStop()
schedulerService.closeOnStop()
val rpcOps = makeRPCOps(cordappLoader, checkpointDumper)
val (identity, identityKeyPair) = obtainIdentity() val (identity, identityKeyPair) = obtainIdentity()
X509Utilities.validateCertPath(trustRoot, identity.certPath) X509Utilities.validateCertPath(trustRoot, identity.certPath)
@ -815,7 +826,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
configuration.baseDirectory, configuration.baseDirectory,
configuration.extraNetworkMapKeys, configuration.extraNetworkMapKeys,
networkParametersStorage networkParametersStorage
).closeOnStop() )
protected open fun makeNodeSchedulerService() = NodeSchedulerService( protected open fun makeNodeSchedulerService() = NodeSchedulerService(
platformClock, platformClock,
@ -826,7 +837,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
nodeProperties, nodeProperties,
configuration.drainingModePollPeriod, configuration.drainingModePollPeriod,
unfinishedSchedules = busyNodeLatch unfinishedSchedules = busyNodeLatch
).tokenize().closeOnStop() ).tokenize()
private fun makeCordappLoader(configuration: NodeConfiguration, versionInfo: VersionInfo): CordappLoader { private fun makeCordappLoader(configuration: NodeConfiguration, versionInfo: VersionInfo): CordappLoader {
val generatedCordapps = mutableListOf(VirtualCordapp.generateCore(versionInfo)) val generatedCordapps = mutableListOf(VirtualCordapp.generateCore(versionInfo))

View File

@ -12,8 +12,14 @@ import net.corda.core.flows.InitiatedBy
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.* import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.NetworkParametersStorage
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.notary.NotaryService import net.corda.core.internal.notary.NotaryService
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
@ -26,6 +32,9 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.hours import net.corda.core.utilities.hours
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.coretesting.internal.rigorousMock
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
import net.corda.coretesting.internal.testThreadFactory
import net.corda.node.VersionInfo import net.corda.node.VersionInfo
import net.corda.node.internal.AbstractNode import net.corda.node.internal.AbstractNode
import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.internal.InitiatedFlowFactory
@ -33,7 +42,11 @@ import net.corda.node.internal.NodeFlowManager
import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.StartedNodeServices import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.config.* import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.node.services.config.NetworkParameterAcceptanceSettings
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.config.VerifierType
import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.BasicHSMKeyManagementService import net.corda.node.services.keys.BasicHSMKeyManagementService
import net.corda.node.services.keys.KeyManagementServiceInternal import net.corda.node.services.keys.KeyManagementServiceInternal
@ -51,11 +64,12 @@ import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.common.internal.testNetworkParameters
import net.corda.coretesting.internal.rigorousMock import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.coretesting.internal.stubs.CertificateStoreStubs import net.corda.testing.node.MockNetworkNotarySpec
import net.corda.coretesting.internal.testThreadFactory import net.corda.testing.node.MockNetworkParameters
import net.corda.testing.node.* import net.corda.testing.node.MockNodeParameters
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.TestClock
import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.activemq.artemis.utils.ReusableLatch
import org.apache.sshd.common.util.security.SecurityUtils import org.apache.sshd.common.util.security.SecurityUtils
import rx.Observable import rx.Observable
@ -377,7 +391,7 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
} }
override fun makeMessagingService(): MockNodeMessagingService { override fun makeMessagingService(): MockNodeMessagingService {
return MockNodeMessagingService(configuration, serverThread).closeOnStop() return MockNodeMessagingService(configuration, serverThread).closeOnStop(usesDatabase = false)
} }
override fun startMessagingService(rpcOps: RPCOps, override fun startMessagingService(rpcOps: RPCOps,