Fix the BFT startup consensus workaround. (#2042)

This commit is contained in:
Andrzej Cichocki 2017-11-13 15:45:26 +00:00 committed by GitHub
parent d7ce405ec5
commit b8b3911ceb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 21 additions and 18 deletions

View File

@ -108,7 +108,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
override val database: CordaPersistence,
override val rpcOps: CordaRPCOps,
flowStarter: FlowStarter,
internal val schedulerService: NodeSchedulerService) : StartedNode<N> {
override val notaryService: NotaryService?) : StartedNode<N> {
override val services: StartedNodeServices = object : StartedNodeServices, ServiceHubInternal by services, FlowStarter by flowStarter {}
}
@ -181,10 +181,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
readNetworkParameters()
val schemaService = NodeSchemaService(cordappLoader)
// Do all of this in a database transaction so anything that might need a connection has one.
val startedImpl = initialiseDatabasePersistence(schemaService) {
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService) {
val transactionStorage = makeTransactionStorage()
val stateLoader = StateLoaderImpl(transactionStorage)
val services = makeServices(keyPairs, schemaService, transactionStorage, stateLoader)
val notaryService = makeNotaryService(services)
smm = makeStateMachineManager()
val flowStarter = FlowStarterImpl(serverThread, smm)
val schedulerService = NodeSchedulerService(
@ -213,7 +214,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader
runOnStop += network::stop
StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, schedulerService)
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
}
// If we successfully loaded network data from database, we set this future to Unit.
services.networkMapCache.addNode(info)
@ -500,7 +501,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
services.auditService, services.monitoringService, services.networkMapCache, services.schemaService,
services.transactionVerifierService, services.validatedTransactions, services.contractUpgradeService,
services, cordappProvider, this)
makeNetworkServices(tokenizableServices)
return tokenizableServices
}
@ -555,14 +555,15 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
private fun makeNetworkServices(tokenizableServices: MutableList<Any>) {
configuration.notary?.let {
val notaryService = makeCoreNotaryService(it)
tokenizableServices.add(notaryService)
runOnStop += notaryService::stop
installCoreFlow(NotaryFlow.Client::class, notaryService::createServiceFlow)
log.info("Running core notary: ${notaryService.javaClass.name}")
notaryService.start()
private fun makeNotaryService(tokenizableServices: MutableList<Any>): NotaryService? {
return configuration.notary?.let {
makeCoreNotaryService(it).also {
tokenizableServices.add(it)
runOnStop += it::stop
installCoreFlow(NotaryFlow.Client::class, it::createServiceFlow)
log.info("Running core notary: ${it.javaClass.name}")
it.start()
}
}
}
@ -608,7 +609,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected open fun makeBFTCluster(notaryKey: PublicKey, bftSMaRtConfig: BFTSMaRtConfiguration): BFTSMaRt.Cluster {
return object : BFTSMaRt.Cluster {
override fun waitUntilAllReplicasHaveInitialized(notaryService: BFTNonValidatingNotaryService) {
override fun waitUntilAllReplicasHaveInitialized() {
log.warn("A BFT replica may still be initializing, in which case the upcoming consensus change may cause it to spin.")
}
}

View File

@ -7,6 +7,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.StateLoader
import net.corda.core.node.services.NotaryService
import net.corda.core.node.services.TransactionStorage
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.StartedNodeServices
@ -25,6 +26,7 @@ interface StartedNode<out N : AbstractNode> {
val network: MessagingService
val database: CordaPersistence
val rpcOps: CordaRPCOps
val notaryService: NotaryService?
fun dispose() = internals.stop()
fun <T : FlowLogic<*>> registerInitiatedFlow(initiatedFlowClass: Class<T>) = internals.registerInitiatedFlow(initiatedFlowClass)
}

View File

@ -72,7 +72,7 @@ object BFTSMaRt {
interface Cluster {
/** Avoid bug where a replica fails to start due to a consensus change during the BFT startup sequence. */
fun waitUntilAllReplicasHaveInitialized(notaryService: BFTNonValidatingNotaryService)
fun waitUntilAllReplicasHaveInitialized()
}
class Client(config: BFTSMaRtConfig, private val clientId: Int, private val cluster: Cluster, private val notaryService: BFTNonValidatingNotaryService) : SingletonSerializeAsToken() {
@ -106,7 +106,7 @@ object BFTSMaRt {
fun commitTransaction(transaction: Any, otherSide: Party): ClusterResponse {
require(transaction is FilteredTransaction || transaction is SignedTransaction) { "Unsupported transaction type: ${transaction.javaClass.name}" }
awaitClientConnectionToCluster()
cluster.waitUntilAllReplicasHaveInitialized(notaryService)
cluster.waitUntilAllReplicasHaveInitialized()
val requestBytes = CommitRequest(transaction, otherSide).serialize().bytes
val responseBytes = proxy.invokeOrdered(requestBytes)
return responseBytes.deserialize<ClusterResponse>()

View File

@ -323,13 +323,13 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
override fun makeBFTCluster(notaryKey: PublicKey, bftSMaRtConfig: BFTSMaRtConfiguration): BFTSMaRt.Cluster {
return object : BFTSMaRt.Cluster {
override fun waitUntilAllReplicasHaveInitialized(notaryService: BFTNonValidatingNotaryService) {
val clusterNodes = mockNet.nodes.filter { notaryKey in it.started!!.info.legalIdentities.map { it.owningKey } }
override fun waitUntilAllReplicasHaveInitialized() {
val clusterNodes = mockNet.nodes.map { it.started!! }.filter { notaryKey in it.info.legalIdentities.map { it.owningKey } }
if (clusterNodes.size != bftSMaRtConfig.clusterAddresses.size) {
throw IllegalStateException("Unable to enumerate all nodes in BFT cluster.")
}
clusterNodes.forEach {
notaryService.waitUntilReplicaHasInitialized()
(it.notaryService as BFTNonValidatingNotaryService).waitUntilReplicaHasInitialized()
}
}
}