Retire a lateinit var. (#2038)

This commit is contained in:
Andrzej Cichocki 2017-11-15 11:24:27 +00:00 committed by GitHub
parent c4a9320e70
commit b5fffa76f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 69 additions and 86 deletions

View File

@ -42,6 +42,10 @@ import net.corda.node.services.events.ScheduledActivityObserver
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.*
import net.corda.node.services.network.*
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
@ -130,7 +134,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected lateinit var attachments: NodeAttachmentService
protected lateinit var network: MessagingService
protected val runOnStop = ArrayList<() -> Any?>()
protected lateinit var database: CordaPersistence
protected val _nodeReadyFuture = openFuture<Unit>()
protected val networkMapClient: NetworkMapClient? by lazy { configuration.compatibilityZoneURL?.let(::NetworkMapClient) }
@ -153,7 +156,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
@Volatile private var _started: StartedNode<AbstractNode>? = null
/** The implementation of the [CordaRPCOps] interface used by this node. */
open fun makeRPCOps(flowStarter: FlowStarter): CordaRPCOps {
open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence): CordaRPCOps {
return SecureCordaRPCOps(services, smm, database, flowStarter)
}
@ -185,16 +188,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
readNetworkParameters()
val schemaService = NodeSchemaService(cordappLoader)
// Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService) {
val transactionStorage = makeTransactionStorage()
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService) { database ->
val transactionStorage = makeTransactionStorage(database)
val stateLoader = StateLoaderImpl(transactionStorage)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, stateLoader)
val notaryService = makeNotaryService(nodeServices)
smm = makeStateMachineManager()
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, stateLoader, database)
val notaryService = makeNotaryService(nodeServices, database)
smm = makeStateMachineManager(database)
val flowStarter = FlowStarterImpl(serverThread, smm)
val schedulerService = NodeSchedulerService(
platformClock,
this@AbstractNode.database,
database,
flowStarter,
stateLoader,
unfinishedSchedules = busyNodeLatch,
@ -207,8 +210,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS)
}
}
makeVaultObservers(schedulerService)
val rpcOps = makeRPCOps(flowStarter)
makeVaultObservers(schedulerService, database.hibernateConfig)
val rpcOps = makeRPCOps(flowStarter, database)
startMessagingService(rpcOps)
installCoreFlows()
val cordaServices = installCordaServices(flowStarter)
@ -272,8 +275,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
protected abstract fun myAddresses(): List<NetworkHostAndPort>
protected open fun makeStateMachineManager(): StateMachineManager {
protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager {
return StateMachineManagerImpl(
services,
checkpointStorage,
@ -493,7 +495,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
* Builds node internal, advertised, and plugin services.
* Returns a list of tokenizable services to be added to the serialisation context.
*/
private fun makeServices(keyPairs: Set<KeyPair>, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, stateLoader: StateLoader): MutableList<Any> {
private fun makeServices(keyPairs: Set<KeyPair>, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, stateLoader: StateLoader, database: CordaPersistence): MutableList<Any> {
checkpointStorage = DBCheckpointStorage()
val metrics = MetricRegistry()
attachments = NodeAttachmentService(metrics)
@ -507,8 +509,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
transactionStorage,
stateLoader,
MonitoringService(metrics),
cordappProvider)
network = makeMessagingService()
cordappProvider,
database)
network = makeMessagingService(database)
val tokenizableServices = mutableListOf(attachments, network, services.vaultService,
services.keyManagementService, services.identityService, platformClock,
services.auditService, services.monitoringService, services.networkMapCache, services.schemaService,
@ -517,12 +520,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
return tokenizableServices
}
protected open fun makeTransactionStorage(): WritableTransactionStorage = DBTransactionStorage()
private fun makeVaultObservers(schedulerService: SchedulerService) {
protected open fun makeTransactionStorage(database: CordaPersistence): WritableTransactionStorage = DBTransactionStorage()
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration) {
VaultSoftLockManager.install(services.vaultService, smm)
ScheduledActivityObserver.install(services.vaultService, schedulerService)
HibernateObserver.install(services.vaultService.rawUpdates, database.hibernateConfig)
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig)
}
@VisibleForTesting
@ -551,26 +553,26 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Specific class so that MockNode can catch it.
class DatabaseConfigurationException(msg: String) : CordaException(msg)
protected open fun <T> initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: () -> T): T {
protected open fun <T> initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: (CordaPersistence) -> T): T {
val props = configuration.dataSourceProperties
if (props.isNotEmpty()) {
this.database = configureDatabase(props, configuration.database, { _services.identityService }, schemaService)
val database = configureDatabase(props, configuration.database, { _services.identityService }, schemaService)
// Now log the vendor string as this will also cause a connection to be tested eagerly.
database.transaction {
log.info("Connected to ${database.dataSource.connection.metaData.databaseProductName} database.")
}
runOnStop += database::close
return database.transaction {
insideTransaction()
insideTransaction(database)
}
} else {
throw DatabaseConfigurationException("There must be a database configured.")
}
}
private fun makeNotaryService(tokenizableServices: MutableList<Any>): NotaryService? {
private fun makeNotaryService(tokenizableServices: MutableList<Any>, database: CordaPersistence): NotaryService? {
return configuration.notary?.let {
makeCoreNotaryService(it).also {
makeCoreNotaryService(it, database).also {
tokenizableServices.add(it)
runOnStop += it::stop
installCoreFlow(NotaryFlow.Client::class, it::createServiceFlow)
@ -598,24 +600,17 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) { "Node is too old for the network" }
}
private fun makeCoreNotaryService(notaryConfig: NotaryConfig): NotaryService {
private fun makeCoreNotaryService(notaryConfig: NotaryConfig, database: CordaPersistence): NotaryService {
val notaryKey = myNotaryIdentity?.owningKey ?: throw IllegalArgumentException("No notary identity initialized when creating a notary service")
return if (notaryConfig.validating) {
if (notaryConfig.raft != null) {
RaftValidatingNotaryService(services, notaryKey, notaryConfig.raft)
} else if (notaryConfig.bftSMaRt != null) {
throw IllegalArgumentException("Validating BFTSMaRt notary not supported")
return notaryConfig.run {
if (raft != null) {
val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.monitoringService.metrics, raft)
(if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider)
} else if (bftSMaRt != null) {
if (validating) throw IllegalArgumentException("Validating BFTSMaRt notary not supported")
BFTNonValidatingNotaryService(services, notaryKey, bftSMaRt, makeBFTCluster(notaryKey, bftSMaRt))
} else {
ValidatingNotaryService(services, notaryKey)
}
} else {
if (notaryConfig.raft != null) {
RaftNonValidatingNotaryService(services, notaryKey, notaryConfig.raft)
} else if (notaryConfig.bftSMaRt != null) {
val cluster = makeBFTCluster(notaryKey, notaryConfig.bftSMaRt)
BFTNonValidatingNotaryService(services, notaryKey, notaryConfig.bftSMaRt, cluster)
} else {
SimpleNotaryService(services, notaryKey)
(if (validating) ::ValidatingNotaryService else ::SimpleNotaryService)(services, notaryKey)
}
}
}
@ -657,8 +652,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
_started = null
}
protected abstract fun makeMessagingService(): MessagingService
protected abstract fun makeMessagingService(database: CordaPersistence): MessagingService
protected abstract fun startMessagingService(rpcOps: RPCOps)
private fun obtainIdentity(notaryConfig: NotaryConfig?): Pair<PartyAndCertificate, KeyPair> {
@ -716,8 +710,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
protected open fun generateKeyPair() = cryptoGenerateKeyPair()
protected open fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader): VaultServiceInternal {
return NodeVaultService(platformClock, keyManagementService, stateLoader, database.hibernateConfig)
protected open fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader, hibernateConfig: HibernateConfiguration): VaultServiceInternal {
return NodeVaultService(platformClock, keyManagementService, stateLoader, hibernateConfig)
}
private inner class ServiceHubInternalImpl(
@ -730,7 +724,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
override val validatedTransactions: WritableTransactionStorage,
private val stateLoader: StateLoader,
override val monitoringService: MonitoringService,
override val cordappProvider: CordappProviderInternal
override val cordappProvider: CordappProviderInternal,
override val database: CordaPersistence
) : SingletonSerializeAsToken(), ServiceHubInternal, StateLoader by stateLoader {
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage()
@ -739,18 +734,17 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
override val networkMapCache by lazy {
NetworkMapCacheImpl(
PersistentNetworkMapCache(
this@AbstractNode.database,
database,
networkParameters.notaries),
identityService)
}
override val vaultService by lazy { makeVaultService(keyManagementService, stateLoader) }
override val vaultService by lazy { makeVaultService(keyManagementService, stateLoader, database.hibernateConfig) }
override val contractUpgradeService by lazy { ContractUpgradeServiceImpl() }
override val attachments: AttachmentStorage get() = this@AbstractNode.attachments
override val networkService: MessagingService get() = network
override val clock: Clock get() = platformClock
override val myInfo: NodeInfo get() = info
override val myNodeStateObservable: Observable<NodeState> get() = nodeStateObservable
override val database: CordaPersistence get() = this@AbstractNode.database
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" }

View File

@ -25,6 +25,7 @@ import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.utilities.AddressUtils
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.DemoClock
import net.corda.nodeapi.internal.ShutdownHook
import net.corda.nodeapi.internal.addShutdownHook
@ -128,8 +129,7 @@ open class Node(configuration: NodeConfiguration,
private var shutdownHook: ShutdownHook? = null
private lateinit var userService: RPCUserService
override fun makeMessagingService(): MessagingService {
override fun makeMessagingService(database: CordaPersistence): MessagingService {
userService = RPCUserServiceImpl(configuration.rpcUsers)
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
@ -214,7 +214,7 @@ open class Node(configuration: NodeConfiguration,
* This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details
* on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url
*/
override fun <T> initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: () -> T): T {
override fun <T> initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: (CordaPersistence) -> T): T {
val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url")
val h2Prefix = "jdbc:h2:file:"
if (databaseUrl != null && databaseUrl.startsWith(h2Prefix)) {

View File

@ -77,8 +77,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
private val serverAddress: NetworkHostAndPort,
private val myIdentity: PublicKey,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
val database: CordaPersistence,
val monitoringService: MonitoringService,
private val database: CordaPersistence,
private val monitoringService: MonitoringService,
advertisedAddress: NetworkHostAndPort = serverAddress
) : ArtemisMessagingComponent(), MessagingService {
companion object {

View File

@ -2,23 +2,20 @@ package net.corda.node.services.transactions
import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotaryFlow
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.RaftConfig
import java.security.PublicKey
/** A non-validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
class RaftNonValidatingNotaryService(override val services: ServiceHubInternal,
class RaftNonValidatingNotaryService(override val services: ServiceHub,
override val notaryIdentityKey: PublicKey,
raftConfig: RaftConfig) : TrustedAuthorityNotaryService() {
override val uniquenessProvider: RaftUniquenessProvider) : TrustedAuthorityNotaryService() {
companion object {
val id = constructId(validating = false, raft = true)
}
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services, raftConfig)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service {
return NonValidatingNotaryFlow(otherPartySession, this)
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services.transactions
import com.codahale.metrics.Gauge
import com.codahale.metrics.MetricRegistry
import io.atomix.catalyst.buffer.BufferInput
import io.atomix.catalyst.buffer.BufferOutput
import io.atomix.catalyst.serializer.Serializer
@ -25,10 +26,10 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.RaftConfig
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.config.NodeSSLConfiguration
import net.corda.nodeapi.config.SSLConfiguration
import java.nio.file.Path
import java.util.concurrent.CompletableFuture
@ -44,7 +45,7 @@ import javax.persistence.*
* to the cluster leader to be actioned.
*/
@ThreadSafe
class RaftUniquenessProvider(private val services: ServiceHubInternal, private val raftConfig: RaftConfig) : UniquenessProvider, SingletonSerializeAsToken() {
class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfiguration, private val db: CordaPersistence, private val metrics: MetricRegistry, private val raftConfig: RaftConfig) : UniquenessProvider, SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<RaftUniquenessProvider>()
@ -77,13 +78,7 @@ class RaftUniquenessProvider(private val services: ServiceHubInternal, private v
)
/** Directory storing the Raft log and state machine snapshots */
private val storagePath: Path = services.configuration.baseDirectory
/** Address of the Copycat node run by this Corda node */
/** The database to store the state machine state in */
private val db: CordaPersistence = services.database
/** SSL configuration */
private val transportConfiguration: SSLConfiguration = services.configuration
private val storagePath: Path = transportConfiguration.baseDirectory
private lateinit var _clientFuture: CompletableFuture<CopycatClient>
private lateinit var server: CopycatServer
@ -177,15 +172,13 @@ class RaftUniquenessProvider(private val services: ServiceHubInternal, private v
}
private fun registerMonitoring() {
services.monitoringService.metrics.register("RaftCluster.ThisServerStatus", Gauge<String> {
metrics.register("RaftCluster.ThisServerStatus", Gauge<String> {
server.state().name
})
services.monitoringService.metrics.register("RaftCluster.MembersCount", Gauge<Int> {
metrics.register("RaftCluster.MembersCount", Gauge<Int> {
server.cluster().members().size
})
services.monitoringService.metrics.register("RaftCluster.Members", Gauge<List<String>> {
metrics.register("RaftCluster.Members", Gauge<List<String>> {
server.cluster().members().map { it.address().toString() }
})
}

View File

@ -2,23 +2,20 @@ package net.corda.node.services.transactions
import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotaryFlow
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.RaftConfig
import java.security.PublicKey
/** A validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
class RaftValidatingNotaryService(override val services: ServiceHubInternal,
class RaftValidatingNotaryService(override val services: ServiceHub,
override val notaryIdentityKey: PublicKey,
raftConfig: RaftConfig) : TrustedAuthorityNotaryService() {
override val uniquenessProvider: RaftUniquenessProvider) : TrustedAuthorityNotaryService() {
companion object {
val id = constructId(validating = true, raft = true)
}
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services, raftConfig)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service {
return ValidatingNotaryFlow(otherPartySession, this)
}

View File

@ -296,8 +296,8 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
return mockNet.createNode(MockNodeParameters(legalName = name), nodeFactory = { args ->
object : MockNetwork.MockNode(args) {
// That constructs a recording tx storage
override fun makeTransactionStorage(): WritableTransactionStorage {
return RecordingTransactionStorage(database, super.makeTransactionStorage())
override fun makeTransactionStorage(database: CordaPersistence): WritableTransactionStorage {
return RecordingTransactionStorage(database, super.makeTransactionStorage(database))
}
}
})

View File

@ -25,6 +25,7 @@ import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.persistence.HibernateConfiguration
import net.corda.testing.chooseIdentity
import net.corda.testing.node.MockNetwork
import net.corda.testing.rigorousMock
@ -81,8 +82,8 @@ class VaultSoftLockManagerTest {
}
private val mockNet = MockNetwork(cordappPackages = listOf(ContractImpl::class.packageName), defaultFactory = { args ->
object : MockNetwork.MockNode(args) {
override fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader): VaultServiceInternal {
val realVault = super.makeVaultService(keyManagementService, stateLoader)
override fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader, hibernateConfig: HibernateConfiguration): VaultServiceInternal {
val realVault = super.makeVaultService(keyManagementService, stateLoader, hibernateConfig)
return object : VaultServiceInternal by realVault {
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests.

View File

@ -35,6 +35,7 @@ import net.corda.node.services.transactions.BFTSMaRt
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.common.internal.NetworkParametersCopier
@ -256,7 +257,7 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
// We only need to override the messaging service here, as currently everything that hits disk does so
// through the java.nio API which we are already mocking via Jimfs.
override fun makeMessagingService(): MessagingService {
override fun makeMessagingService(database: CordaPersistence): MessagingService {
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
return mockNet.messagingNetwork.createNodeWithID(
!mockNet.threadPerNode,
@ -302,9 +303,9 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
override val serializationWhitelists: List<SerializationWhitelist>
get() = testSerializationWhitelists
private var dbCloser: (() -> Any?)? = null
override fun <T> initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: () -> T) = super.initialiseDatabasePersistence(schemaService) {
override fun <T> initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: (CordaPersistence) -> T) = super.initialiseDatabasePersistence(schemaService) { database ->
dbCloser = database::close
insideTransaction()
insideTransaction(database)
}
fun disableDBCloseOnStop() {