diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 952fae0e53..2ed2096e35 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -108,7 +108,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, override val database: CordaPersistence, override val rpcOps: CordaRPCOps, flowStarter: FlowStarter, - internal val schedulerService: NodeSchedulerService) : StartedNode { + override val notaryService: NotaryService?) : StartedNode { override val services: StartedNodeServices = object : StartedNodeServices, ServiceHubInternal by services, FlowStarter by flowStarter {} } @@ -181,10 +181,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration, readNetworkParameters() val schemaService = NodeSchemaService(cordappLoader) // Do all of this in a database transaction so anything that might need a connection has one. - val startedImpl = initialiseDatabasePersistence(schemaService) { + val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService) { val transactionStorage = makeTransactionStorage() val stateLoader = StateLoaderImpl(transactionStorage) val services = makeServices(keyPairs, schemaService, transactionStorage, stateLoader) + val notaryService = makeNotaryService(services) smm = makeStateMachineManager() val flowStarter = FlowStarterImpl(serverThread, smm) val schedulerService = NodeSchedulerService( @@ -213,7 +214,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader runOnStop += network::stop - StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, schedulerService) + Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) } // If we successfully loaded network data from database, we set this future to Unit. services.networkMapCache.addNode(info) @@ -500,7 +501,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, services.auditService, services.monitoringService, services.networkMapCache, services.schemaService, services.transactionVerifierService, services.validatedTransactions, services.contractUpgradeService, services, cordappProvider, this) - makeNetworkServices(tokenizableServices) return tokenizableServices } @@ -555,14 +555,15 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } - private fun makeNetworkServices(tokenizableServices: MutableList) { - configuration.notary?.let { - val notaryService = makeCoreNotaryService(it) - tokenizableServices.add(notaryService) - runOnStop += notaryService::stop - installCoreFlow(NotaryFlow.Client::class, notaryService::createServiceFlow) - log.info("Running core notary: ${notaryService.javaClass.name}") - notaryService.start() + private fun makeNotaryService(tokenizableServices: MutableList): NotaryService? { + return configuration.notary?.let { + makeCoreNotaryService(it).also { + tokenizableServices.add(it) + runOnStop += it::stop + installCoreFlow(NotaryFlow.Client::class, it::createServiceFlow) + log.info("Running core notary: ${it.javaClass.name}") + it.start() + } } } @@ -608,7 +609,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected open fun makeBFTCluster(notaryKey: PublicKey, bftSMaRtConfig: BFTSMaRtConfiguration): BFTSMaRt.Cluster { return object : BFTSMaRt.Cluster { - override fun waitUntilAllReplicasHaveInitialized(notaryService: BFTNonValidatingNotaryService) { + override fun waitUntilAllReplicasHaveInitialized() { log.warn("A BFT replica may still be initializing, in which case the upcoming consensus change may cause it to spin.") } } diff --git a/node/src/main/kotlin/net/corda/node/internal/StartedNode.kt b/node/src/main/kotlin/net/corda/node/internal/StartedNode.kt index fed9c073db..300e5a7d60 100644 --- a/node/src/main/kotlin/net/corda/node/internal/StartedNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/StartedNode.kt @@ -7,6 +7,7 @@ import net.corda.core.flows.FlowLogic import net.corda.core.messaging.CordaRPCOps import net.corda.core.node.NodeInfo import net.corda.core.node.StateLoader +import net.corda.core.node.services.NotaryService import net.corda.core.node.services.TransactionStorage import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.StartedNodeServices @@ -25,6 +26,7 @@ interface StartedNode { val network: MessagingService val database: CordaPersistence val rpcOps: CordaRPCOps + val notaryService: NotaryService? fun dispose() = internals.stop() fun > registerInitiatedFlow(initiatedFlowClass: Class) = internals.registerInitiatedFlow(initiatedFlowClass) } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt index 96dd16f5b7..b1ea728694 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt @@ -72,7 +72,7 @@ object BFTSMaRt { interface Cluster { /** Avoid bug where a replica fails to start due to a consensus change during the BFT startup sequence. */ - fun waitUntilAllReplicasHaveInitialized(notaryService: BFTNonValidatingNotaryService) + fun waitUntilAllReplicasHaveInitialized() } class Client(config: BFTSMaRtConfig, private val clientId: Int, private val cluster: Cluster, private val notaryService: BFTNonValidatingNotaryService) : SingletonSerializeAsToken() { @@ -106,7 +106,7 @@ object BFTSMaRt { fun commitTransaction(transaction: Any, otherSide: Party): ClusterResponse { require(transaction is FilteredTransaction || transaction is SignedTransaction) { "Unsupported transaction type: ${transaction.javaClass.name}" } awaitClientConnectionToCluster() - cluster.waitUntilAllReplicasHaveInitialized(notaryService) + cluster.waitUntilAllReplicasHaveInitialized() val requestBytes = CommitRequest(transaction, otherSide).serialize().bytes val responseBytes = proxy.invokeOrdered(requestBytes) return responseBytes.deserialize() diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt index 9e63ec1ab4..a79a7bcefc 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -323,13 +323,13 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete override fun makeBFTCluster(notaryKey: PublicKey, bftSMaRtConfig: BFTSMaRtConfiguration): BFTSMaRt.Cluster { return object : BFTSMaRt.Cluster { - override fun waitUntilAllReplicasHaveInitialized(notaryService: BFTNonValidatingNotaryService) { - val clusterNodes = mockNet.nodes.filter { notaryKey in it.started!!.info.legalIdentities.map { it.owningKey } } + override fun waitUntilAllReplicasHaveInitialized() { + val clusterNodes = mockNet.nodes.map { it.started!! }.filter { notaryKey in it.info.legalIdentities.map { it.owningKey } } if (clusterNodes.size != bftSMaRtConfig.clusterAddresses.size) { throw IllegalStateException("Unable to enumerate all nodes in BFT cluster.") } clusterNodes.forEach { - notaryService.waitUntilReplicaHasInitialized() + (it.notaryService as BFTNonValidatingNotaryService).waitUntilReplicaHasInitialized() } } }