Workaround BFT bug that causes redundant replicas to get stuck in startup (#1100)

This commit is contained in:
Andrzej Cichocki 2017-07-26 19:41:29 +01:00 committed by GitHub
parent b546bec064
commit 25be649f7b
14 changed files with 114 additions and 48 deletions

View File

@ -1,4 +1,4 @@
gradlePluginsVersion=0.13.4
gradlePluginsVersion=0.13.5
kotlinVersion=1.1.1
guavaVersion=21.0
bouncycastleVersion=1.57

View File

@ -4,6 +4,7 @@ import static java.util.Collections.emptyList;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -86,6 +87,6 @@ public class CordformNode implements NodeDefinition {
* @param id The (0-based) BFT replica ID.
*/
public void bftReplicaId(Integer id) {
config = config.withValue("bftReplicaId", ConfigValueFactory.fromAnyRef(id));
config = config.withValue("bftSMaRt", ConfigValueFactory.fromMap(Collections.singletonMap("replicaId", id)));
}
}

View File

@ -16,6 +16,7 @@ import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.Try
import net.corda.node.internal.AbstractNode
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.services.transactions.minClusterSize
@ -25,14 +26,11 @@ import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.MockNetwork
import org.bouncycastle.asn1.x500.X500Name
import org.junit.After
import org.junit.Ignore
import org.junit.Test
import java.nio.file.Files
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@Ignore("Replica 6 constructor goes around the loop in BaseStateManager.askCurrentConsensusId forever, so stop hangs when getting the Replica instance to dispose it."
+ "Happens frequently on Windows and has happened on Linux.")
class BFTNotaryServiceTests {
companion object {
private val clusterName = X500Name("CN=BFT,O=R3,OU=corda,L=Zurich,C=CH")
@ -46,7 +44,7 @@ class BFTNotaryServiceTests {
mockNet.stopNodes()
}
private fun bftNotaryCluster(clusterSize: Int): Party {
private fun bftNotaryCluster(clusterSize: Int, exposeRaces: Boolean = false): Party {
Files.deleteIfExists("config" / "currentView") // XXX: Make config object warn if this exists?
val replicaIds = (0 until clusterSize)
val party = ServiceIdentityGenerator.generateToDisk(
@ -60,13 +58,26 @@ class BFTNotaryServiceTests {
node.network.myAddress,
advertisedServices = bftNotaryService,
configOverrides = {
whenever(it.bftReplicaId).thenReturn(replicaId)
whenever(it.bftSMaRt).thenReturn(BFTSMaRtConfiguration(replicaId, false, exposeRaces))
whenever(it.notaryClusterAddresses).thenReturn(notaryClusterAddresses)
})
}
return party
}
/** Failure mode is the redundant replica gets stuck in startup, so we can't dispose it cleanly at the end. */
@Test
fun `all replicas start even if there is a new consensus during startup`() {
val notary = bftNotaryCluster(minClusterSize(1), true) // This true adds a sleep to expose the race.
val f = node.run {
val trivialTx = signInitialTransaction(notary) {}
// Create a new consensus while the redundant replica is sleeping:
services.startFlow(NotaryFlow.Client(trivialTx)).resultFuture
}
mockNet.runNetwork()
f.getOrThrow()
}
@Test
fun `detect double spend 1 faulty`() {
detectDoubleSpend(1)

View File

@ -564,10 +564,20 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private fun makeAdvertisedServices(tokenizableServices: MutableList<Any>) {
val serviceTypes = info.advertisedServices.map { it.info.type }
if (NetworkMapService.type in serviceTypes) makeNetworkMapService()
val notaryServiceType = serviceTypes.singleOrNull { it.isNotary() }
if (notaryServiceType != null) {
makeCoreNotaryService(notaryServiceType, tokenizableServices)
val service = makeCoreNotaryService(notaryServiceType)
if (service != null) {
service.apply {
tokenizableServices.add(this)
runOnStop += this::stop
start()
}
installCoreFlow(NotaryFlow.Client::class, { party: Party, version: Int -> service.createServiceFlow(party, version) })
} else {
log.info("Notary type ${notaryServiceType.id} does not match any built-in notary types. " +
"It is expected to be loaded via a CorDapp")
}
}
}
@ -628,25 +638,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
inNodeNetworkMapService = PersistentNetworkMapService(services, configuration.minimumPlatformVersion)
}
open protected fun makeCoreNotaryService(type: ServiceType, tokenizableServices: MutableList<Any>) {
val service: NotaryService = when (type) {
open protected fun makeCoreNotaryService(type: ServiceType): NotaryService? {
return when (type) {
SimpleNotaryService.type -> SimpleNotaryService(services)
ValidatingNotaryService.type -> ValidatingNotaryService(services)
RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(services)
RaftValidatingNotaryService.type -> RaftValidatingNotaryService(services)
BFTNonValidatingNotaryService.type -> BFTNonValidatingNotaryService(services)
else -> {
log.info("Notary type ${type.id} does not match any built-in notary types. " +
"It is expected to be loaded via a CorDapp")
return
}
else -> null
}
service.apply {
tokenizableServices.add(this)
runOnStop += this::stop
start()
}
installCoreFlow(NotaryFlow.Client::class, { party: Party, version: Int -> service.createServiceFlow(party, version) })
}
protected open fun makeIdentityService(trustRoot: X509Certificate,

View File

@ -167,7 +167,7 @@ open class NodeStartup(val args: Array<String>) {
}
open protected fun banJavaSerialisation(conf: FullNodeConfiguration) {
SerialFilter.install(if (conf.bftReplicaId != null) ::bftSMaRtSerialFilter else ::defaultSerialFilter)
SerialFilter.install(if (conf.bftSMaRt.isValid()) ::bftSMaRtSerialFilter else ::defaultSerialFilter)
}
open protected fun getVersionInfo(): VersionInfo {

View File

@ -13,6 +13,11 @@ import java.net.URL
import java.nio.file.Path
import java.util.*
/** @param exposeRaces for testing only, so its default is not in reference.conf but here. */
data class BFTSMaRtConfiguration(val replicaId: Int, val debug: Boolean, val exposeRaces: Boolean = false) {
fun isValid() = replicaId >= 0
}
interface NodeConfiguration : NodeSSLConfiguration {
val myLegalName: X500Name
val networkMapService: NetworkMapInfo?
@ -26,7 +31,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val certificateChainCheckPolicies: List<CertChainPolicyConfig>
val verifierType: VerifierType
val messageRedeliveryDelaySeconds: Int
val bftReplicaId: Int?
val bftSMaRt: BFTSMaRtConfiguration
val notaryNodeAddress: NetworkHostAndPort?
val notaryClusterAddresses: List<NetworkHostAndPort>
}
@ -56,7 +61,7 @@ data class FullNodeConfiguration(
// Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one
val messagingServerAddress: NetworkHostAndPort?,
val extraAdvertisedServiceIds: List<String>,
override val bftReplicaId: Int?,
override val bftSMaRt: BFTSMaRtConfiguration,
override val notaryNodeAddress: NetworkHostAndPort?,
override val notaryClusterAddresses: List<NetworkHostAndPort>,
override val certificateChainCheckPolicies: List<CertChainPolicyConfig>,

View File

@ -142,6 +142,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
SerializeAsTokenContext(tokenizableServices, SERIALIZATION_FACTORY, CHECKPOINT_CONTEXT, serviceHub)
}
fun findServices(predicate: (Any) -> Boolean) = tokenizableServices.filter(predicate)
/** Returns a list of all state machines executing the given flow logic at the top level (subflows do not count) */
fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
@Suppress("UNCHECKED_CAST")

View File

@ -23,21 +23,25 @@ import kotlin.concurrent.thread
*
* A transaction is notarised when the consensus is reached by the cluster on its uniqueness, and time-window validity.
*/
class BFTNonValidatingNotaryService(override val services: ServiceHubInternal) : NotaryService() {
class BFTNonValidatingNotaryService(override val services: ServiceHubInternal, cluster: BFTSMaRt.Cluster = distributedCluster) : NotaryService() {
companion object {
val type = SimpleNotaryService.type.getSubType("bft")
private val log = loggerFor<BFTNonValidatingNotaryService>()
private val distributedCluster = object : BFTSMaRt.Cluster {
override fun waitUntilAllReplicasHaveInitialized() {
log.warn("A replica may still be initializing, in which case the upcoming consensus change may cause it to spin.")
}
}
}
private val client: BFTSMaRt.Client
private val replicaHolder = SettableFuture.create<Replica>()
init {
val replicaId = services.configuration.bftReplicaId ?: throw IllegalArgumentException("bftReplicaId value must be specified in the configuration")
val config = BFTSMaRtConfig(services.configuration.notaryClusterAddresses)
client = config.use {
val configHandle = config.handle()
require(services.configuration.bftSMaRt.isValid()) { "bftSMaRt replicaId must be specified in the configuration" }
client = BFTSMaRtConfig(services.configuration.notaryClusterAddresses, services.configuration.bftSMaRt.debug, services.configuration.bftSMaRt.exposeRaces).use {
val replicaId = services.configuration.bftSMaRt.replicaId
val configHandle = it.handle()
// Replica startup must be in parallel with other replicas, otherwise the constructor may not return:
thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) {
configHandle.use {
@ -47,11 +51,15 @@ class BFTNonValidatingNotaryService(override val services: ServiceHubInternal) :
log.info("BFT SMaRt replica $replicaId is running.")
}
}
BFTSMaRt.Client(it, replicaId)
BFTSMaRt.Client(it, replicaId, cluster)
}
}
fun waitUntilReplicaHasInitialized() {
log.debug { "Waiting for replica ${services.configuration.bftSMaRt.replicaId} to initialize." }
replicaHolder.getOrThrow() // It's enough to wait for the ServiceReplica constructor to return.
}
fun commitTransaction(tx: Any, otherSide: Party) = client.commitTransaction(tx, otherSide)
override fun createServiceFlow(otherParty: Party, platformVersion: Int): FlowLogic<Void?> {

View File

@ -3,6 +3,7 @@ package net.corda.node.services.transactions
import bftsmart.communication.ServerCommunicationSystem
import bftsmart.communication.client.netty.NettyClientServerCommunicationSystemClientSide
import bftsmart.communication.client.netty.NettyClientServerSession
import bftsmart.statemanagement.strategy.StandardStateManager
import bftsmart.tom.MessageContext
import bftsmart.tom.ServiceProxy
import bftsmart.tom.ServiceReplica
@ -70,7 +71,12 @@ object BFTSMaRt {
data class Signatures(val txSignatures: List<DigitalSignature>) : ClusterResponse()
}
class Client(config: BFTSMaRtConfig, private val clientId: Int) : SingletonSerializeAsToken() {
interface Cluster {
/** Avoid bug where a replica fails to start due to a consensus change during the BFT startup sequence. */
fun waitUntilAllReplicasHaveInitialized()
}
class Client(config: BFTSMaRtConfig, private val clientId: Int, private val cluster: Cluster) : SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<Client>()
}
@ -100,6 +106,7 @@ object BFTSMaRt {
fun commitTransaction(transaction: Any, otherSide: Party): ClusterResponse {
require(transaction is FilteredTransaction || transaction is SignedTransaction) { "Unsupported transaction type: ${transaction.javaClass.name}" }
awaitClientConnectionToCluster()
cluster.waitUntilAllReplicasHaveInitialized()
val requestBytes = CommitRequest(transaction, otherSide).serialize().bytes
val responseBytes = proxy.invokeOrdered(requestBytes)
return responseBytes.deserialize<ClusterResponse>()
@ -175,6 +182,18 @@ object BFTSMaRt {
private val log = loggerFor<Replica>()
}
private val stateManagerOverride = run {
// Mock framework shutdown is not in reverse order, and we need to stop the faulty replicas first:
val exposeStartupRace = config.exposeRaces && replicaId < maxFaultyReplicas(config.clusterSize)
object : StandardStateManager() {
override fun askCurrentConsensusId() {
if (exposeStartupRace) Thread.sleep(20000) // Must be long enough for the non-redundant replicas to reach a non-initial consensus.
super.askCurrentConsensusId()
}
}
}
override fun getStateManager() = stateManagerOverride
// TODO: Use Requery with proper DB schema instead of JDBCHashMap.
// Must be initialised before ServiceReplica is started
private val commitLog = services.database.transaction { JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(tableName) }

View File

@ -17,15 +17,17 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
* Each instance of this class creates such a configHome, accessible via [path].
* The files are deleted on [close] typically via [use], see [PathManager] for details.
*/
class BFTSMaRtConfig(private val replicaAddresses: List<NetworkHostAndPort>, debug: Boolean = false) : PathManager<BFTSMaRtConfig>(Files.createTempDirectory("bft-smart-config")) {
class BFTSMaRtConfig(private val replicaAddresses: List<NetworkHostAndPort>, debug: Boolean, val exposeRaces: Boolean) : PathManager<BFTSMaRtConfig>(Files.createTempDirectory("bft-smart-config")) {
companion object {
private val log = loggerFor<BFTSMaRtConfig>()
internal val portIsClaimedFormat = "Port %s is claimed by another replica: %s"
}
val clusterSize get() = replicaAddresses.size
init {
val claimedPorts = mutableSetOf<NetworkHostAndPort>()
val n = replicaAddresses.size
val n = clusterSize
(0 until n).forEach { replicaId ->
// Each replica claims the configured port and the next one:
replicaPorts(replicaId).forEach { port ->

View File

@ -14,4 +14,8 @@ certificateSigningService = "https://cordaci-netperm.corda.r3cev.com"
useHTTPS = false
h2port = 0
useTestClock = false
verifierType = InMemory
verifierType = InMemory
bftSMaRt = {
replicaId = -1
debug = false
}

View File

@ -29,7 +29,7 @@ class FullNodeConfigurationTest {
rpcAddress = NetworkHostAndPort("localhost", 1),
messagingServerAddress = null,
extraAdvertisedServiceIds = emptyList(),
bftReplicaId = null,
bftSMaRt = BFTSMaRtConfiguration(-1, false),
notaryNodeAddress = null,
notaryClusterAddresses = emptyList(),
certificateChainCheckPolicies = emptyList(),

View File

@ -28,13 +28,13 @@ class BFTSMaRtConfigTests {
@Test
fun `overlapping port ranges are rejected`() {
fun addresses(vararg ports: Int) = ports.map { NetworkHostAndPort("localhost", it) }
assertThatThrownBy { BFTSMaRtConfig(addresses(11000, 11001)).use {} }
fun config(vararg ports: Int) = BFTSMaRtConfig(ports.map { NetworkHostAndPort("localhost", it) }, false, false)
assertThatThrownBy { config(11000, 11001).use {} }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessage(portIsClaimedFormat.format("localhost:11001", setOf("localhost:11000", "localhost:11001")))
assertThatThrownBy { BFTSMaRtConfig(addresses(11001, 11000)).use {} }
assertThatThrownBy { config(11001, 11000).use {} }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessage(portIsClaimedFormat.format("localhost:11001", setOf("localhost:11001", "localhost:11002", "localhost:11000")))
BFTSMaRtConfig(addresses(11000, 11002)).use {} // Non-overlapping.
config(11000, 11002).use {} // Non-overlapping.
}
}

View File

@ -20,9 +20,7 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.ServiceEntry
import net.corda.core.node.WorldMapLocation
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.AbstractNode
@ -32,9 +30,7 @@ import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.InMemoryNetworkMapService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.services.transactions.*
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.testing.*
@ -261,6 +257,24 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
var acceptableLiveFiberCountOnStop: Int = 0
override fun acceptableLiveFiberCountOnStop(): Int = acceptableLiveFiberCountOnStop
override fun makeCoreNotaryService(type: ServiceType): NotaryService? {
if (type != BFTNonValidatingNotaryService.type) return super.makeCoreNotaryService(type)
return BFTNonValidatingNotaryService(services, object : BFTSMaRt.Cluster {
override fun waitUntilAllReplicasHaveInitialized() {
val clusterNodes = mockNet.nodes.filter {
services.notaryIdentityKey in it.info.serviceIdentities(BFTNonValidatingNotaryService.type).map { it.owningKey }
}
if (clusterNodes.size != configuration.notaryClusterAddresses.size) {
throw IllegalStateException("Unable to enumerate all nodes in BFT cluster.")
}
clusterNodes.forEach {
val notaryService = it.smm.findServices { it is BFTNonValidatingNotaryService }.single() as BFTNonValidatingNotaryService
notaryService.waitUntilReplicaHasInitialized()
}
}
})
}
}
/**