BFT fixes (#844)

* Fix BFT config that only worked with clusters of size 4
* Shutdown BFT properly so that tests can run back to back in theory
* Wait for initial connection between client and all replicas before touching it
* Add test for non-trivial BFT cluster
* Shutdown NodeBasedTest nodes in parallel, as BFT shutdown time is non-trivial
* Overlapping ports check no longer assumes all on localhost
* Fix overlapping ports test to actually check the messages
This commit is contained in:
Andrzej Cichocki
2017-06-20 11:01:52 +01:00
committed by GitHub
parent 66421692a3
commit 52c7100267
12 changed files with 236 additions and 139 deletions

View File

@ -33,6 +33,7 @@ import java.util.zip.ZipEntry
import java.util.zip.ZipInputStream import java.util.zip.ZipInputStream
import java.util.zip.ZipOutputStream import java.util.zip.ZipOutputStream
import kotlin.concurrent.withLock import kotlin.concurrent.withLock
import kotlin.reflect.KClass
import kotlin.reflect.KProperty import kotlin.reflect.KProperty
val Int.days: Duration get() = Duration.ofDays(this.toLong()) val Int.days: Duration get() = Duration.ofDays(this.toLong())
@ -471,3 +472,21 @@ fun <T> Class<T>.checkNotUnorderedHashMap() {
fun Class<*>.requireExternal(msg: String = "Internal class") fun Class<*>.requireExternal(msg: String = "Internal class")
= require(!name.startsWith("net.corda.node.") && !name.contains(".internal.")) { "$msg: $name" } = require(!name.startsWith("net.corda.node.") && !name.contains(".internal.")) { "$msg: $name" }
interface DeclaredField<T> {
companion object {
inline fun <reified T> Any?.declaredField(clazz: KClass<*>, name: String): DeclaredField<T> = declaredField(clazz.java, name)
inline fun <reified T> Any.declaredField(name: String): DeclaredField<T> = declaredField(javaClass, name)
inline fun <reified T> Any?.declaredField(clazz: Class<*>, name: String): DeclaredField<T> {
val javaField = clazz.getDeclaredField(name).apply { isAccessible = true }
val receiver = this
return object : DeclaredField<T> {
override var value
get() = javaField.get(receiver) as T
set(value) = javaField.set(receiver, value)
}
}
}
var value: T
}

View File

@ -1,14 +1,14 @@
package net.corda.node.services package net.corda.node.services
import net.corda.core.contracts.DummyContract import com.google.common.util.concurrent.Futures
import net.corda.core.contracts.StateAndRef import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.StateRef import net.corda.core.*
import net.corda.core.contracts.TransactionType import net.corda.core.contracts.*
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.appendToCommonName import net.corda.core.crypto.appendToCommonName
import net.corda.core.getOrThrow
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.ALICE import net.corda.core.utilities.ALICE
import net.corda.core.utilities.DUMMY_CA import net.corda.core.utilities.DUMMY_CA
import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY
@ -16,69 +16,107 @@ import net.corda.flows.NotaryError
import net.corda.flows.NotaryException import net.corda.flows.NotaryException
import net.corda.flows.NotaryFlow import net.corda.flows.NotaryFlow
import net.corda.node.internal.AbstractNode import net.corda.node.internal.AbstractNode
import net.corda.node.internal.Node
import net.corda.node.services.transactions.BFTNonValidatingNotaryService import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.services.transactions.minClusterSize
import net.corda.node.services.transactions.minCorrectReplicas import net.corda.node.services.transactions.minCorrectReplicas
import net.corda.node.utilities.ServiceIdentityGenerator import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.node.utilities.transaction import net.corda.node.utilities.transaction
import net.corda.testing.node.NodeBasedTest import net.corda.testing.node.NodeBasedTest
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import org.junit.Test import org.junit.Test
import java.util.* import java.nio.file.Files
import kotlin.test.assertEquals import kotlin.test.*
import kotlin.test.assertFailsWith
class BFTNotaryServiceTests : NodeBasedTest() { class BFTNotaryServiceTests : NodeBasedTest() {
@Test companion object {
fun `detect double spend`() { private val clusterName = X500Name("CN=BFT,O=R3,OU=corda,L=Zurich,C=CH")
val clusterName = X500Name("CN=BFT,O=R3,OU=corda,L=Zurich,C=CH") private val serviceType = BFTNonValidatingNotaryService.type
startBFTNotaryCluster(clusterName, 4, BFTNonValidatingNotaryService.type)
val alice = startNode(ALICE.name).getOrThrow()
val notaryParty = alice.netMapCache.getNotary(clusterName)!!
val inputState = issueState(alice, notaryParty)
val firstTxBuilder = TransactionType.General.Builder(notaryParty).withItems(inputState)
val firstSpendTx = alice.services.signInitialTransaction(firstTxBuilder)
alice.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture.getOrThrow()
val secondSpendBuilder = TransactionType.General.Builder(notaryParty).withItems(inputState).also {
it.addOutputState(DummyContract.SingleOwnerState(0, alice.info.legalIdentity))
}
val secondSpendTx = alice.services.signInitialTransaction(secondSpendBuilder)
val secondSpend = alice.services.startFlow(NotaryFlow.Client(secondSpendTx))
val ex = assertFailsWith(NotaryException::class) {
secondSpend.resultFuture.getOrThrow()
}
val error = ex.error as NotaryError.Conflict
assertEquals(error.txId, secondSpendTx.id)
} }
private fun issueState(node: AbstractNode, notary: Party) = node.run { private fun bftNotaryCluster(clusterSize: Int): ListenableFuture<Party> {
database.transaction { Files.deleteIfExists("config" / "currentView") // XXX: Make config object warn if this exists?
val builder = DummyContract.generateInitial(Random().nextInt(), notary, info.legalIdentity.ref(0)) val replicaIds = (0 until clusterSize)
val stx = services.signInitialTransaction(builder) val replicaNames = replicaIds.map { DUMMY_NOTARY.name.appendToCommonName(" $it") }
services.recordTransactions(listOf(stx)) val party = ServiceIdentityGenerator.generateToDisk(
StateAndRef(builder.outputStates().first(), StateRef(stx.id, 0))
}
}
private fun startBFTNotaryCluster(clusterName: X500Name,
clusterSize: Int,
serviceType: ServiceType) {
require(clusterSize > 0)
val replicaNames = (0 until clusterSize).map { DUMMY_NOTARY.name.appendToCommonName(" $it") }
ServiceIdentityGenerator.generateToDisk(
replicaNames.map { baseDirectory(it) }, replicaNames.map { baseDirectory(it) },
DUMMY_CA, DUMMY_CA,
serviceType.id, serviceType.id,
clusterName, clusterName).party
minCorrectReplicas(clusterSize)) val advertisedServices = setOf(ServiceInfo(serviceType, clusterName))
val serviceInfo = ServiceInfo(serviceType, clusterName) val config = mapOf("notaryClusterAddresses" to replicaIds.map { "localhost:${11000 + it * 10}" })
val notaryClusterAddresses = (0 until clusterSize).map { "localhost:${11000 + it * 10}" } return Futures.allAsList(replicaIds.map {
(0 until clusterSize).forEach {
startNode( startNode(
replicaNames[it], replicaNames[it],
advertisedServices = setOf(serviceInfo), advertisedServices = advertisedServices,
configOverrides = mapOf("bftReplicaId" to it, "notaryClusterAddresses" to notaryClusterAddresses) configOverrides = mapOf("bftReplicaId" to it) + config
).getOrThrow() )
}).map { party }
}
@Test
fun `detect double spend 1 faulty`() {
detectDoubleSpend(1)
}
@Test
fun `detect double spend 2 faulty`() {
detectDoubleSpend(2)
}
private fun detectDoubleSpend(faultyReplicas: Int) {
val clusterSize = minClusterSize(faultyReplicas)
val aliceFuture = startNode(ALICE.name)
val notary = bftNotaryCluster(clusterSize).getOrThrow()
aliceFuture.getOrThrow().run {
val issueTx = signInitialTransaction(notary) {
addOutputState(DummyContract.SingleOwnerState(owner = info.legalIdentity))
}
database.transaction {
services.recordTransactions(issueTx)
}
val spendTxs = (1..10).map {
signInitialTransaction(notary, true) {
addInputState(issueTx.tx.outRef<ContractState>(0))
}
}
assertEquals(spendTxs.size, spendTxs.map { it.id }.distinct().size)
val flows = spendTxs.map { NotaryFlow.Client(it) }
val stateMachines = flows.map { services.startFlow(it) }
val results = stateMachines.map { ErrorOr.catch { it.resultFuture.getOrThrow() } }
val successfulIndex = results.mapIndexedNotNull { index, result ->
if (result.error == null) {
val signers = result.getOrThrow().map { it.by }
assertEquals(minCorrectReplicas(clusterSize), signers.size)
signers.forEach {
assertTrue(it in (notary.owningKey as CompositeKey).leafKeys)
}
index
} else {
null
}
}.single()
spendTxs.zip(results).forEach { (tx, result) ->
if (result.error != null) {
val error = (result.error as NotaryException).error as NotaryError.Conflict
assertEquals(tx.id, error.txId)
val (stateRef, consumingTx) = error.conflict.verified().stateHistory.entries.single()
assertEquals(StateRef(issueTx.id, 0), stateRef)
assertEquals(spendTxs[successfulIndex].id, consumingTx.id)
assertEquals(0, consumingTx.inputIndex)
assertEquals(info.legalIdentity, consumingTx.requestingParty)
} }
} }
} }
}
}
private fun AbstractNode.signInitialTransaction(
notary: Party,
makeUnique: Boolean = false,
block: TransactionType.General.Builder.() -> Any?
) = services.signInitialTransaction(TransactionType.General.Builder(notary).apply {
block()
if (makeUnique) {
addAttachment(SecureHash.randomSHA256())
}
})

View File

@ -1,7 +1,8 @@
package net.corda.node package net.corda.node
import net.corda.core.DeclaredField
import net.corda.core.DeclaredField.Companion.declaredField
import net.corda.node.internal.Node import net.corda.node.internal.Node
import java.lang.reflect.Field
import java.lang.reflect.Method import java.lang.reflect.Method
import java.lang.reflect.Proxy import java.lang.reflect.Proxy
@ -11,7 +12,7 @@ internal object SerialFilter {
private val undecided: Any private val undecided: Any
private val rejected: Any private val rejected: Any
private val serialFilterLock: Any private val serialFilterLock: Any
private val serialFilterField: Field private val serialFilterField: DeclaredField<Any>
init { init {
// ObjectInputFilter and friends are in java.io in Java 9 but sun.misc in backports: // ObjectInputFilter and friends are in java.io in Java 9 but sun.misc in backports:
@ -31,8 +32,8 @@ internal object SerialFilter {
undecided = statusEnum.getField("UNDECIDED").get(null) undecided = statusEnum.getField("UNDECIDED").get(null)
rejected = statusEnum.getField("REJECTED").get(null) rejected = statusEnum.getField("REJECTED").get(null)
val configClass = Class.forName("${filterInterface.name}\$Config") val configClass = Class.forName("${filterInterface.name}\$Config")
serialFilterLock = configClass.getDeclaredField("serialFilterLock").also { it.isAccessible = true }.get(null) serialFilterLock = declaredField<Any>(configClass, "serialFilterLock").value
serialFilterField = configClass.getDeclaredField("serialFilter").also { it.isAccessible = true } serialFilterField = declaredField(configClass, "serialFilter")
} }
internal fun install(acceptClass: (Class<*>) -> Boolean) { internal fun install(acceptClass: (Class<*>) -> Boolean) {
@ -46,7 +47,7 @@ internal object SerialFilter {
} }
// Can't simply use the setter as in non-trampoline mode Capsule has inited the filter in premain: // Can't simply use the setter as in non-trampoline mode Capsule has inited the filter in premain:
synchronized(serialFilterLock) { synchronized(serialFilterLock) {
serialFilterField.set(null, filter) serialFilterField.value = filter
} }
} }

View File

@ -669,8 +669,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
BFTNonValidatingNotaryService.type -> with(configuration as FullNodeConfiguration) { BFTNonValidatingNotaryService.type -> with(configuration as FullNodeConfiguration) {
val replicaId = bftReplicaId ?: throw IllegalArgumentException("bftReplicaId value must be specified in the configuration") val replicaId = bftReplicaId ?: throw IllegalArgumentException("bftReplicaId value must be specified in the configuration")
BFTSMaRtConfig(notaryClusterAddresses).use { config -> BFTSMaRtConfig(notaryClusterAddresses).use { config ->
val client = BFTSMaRt.Client(config, replicaId).also { tokenizableServices += it } // (Ab)use replicaId for clientId. BFTNonValidatingNotaryService(config, services, timeWindowChecker, replicaId, database).also {
BFTNonValidatingNotaryService(config, services, timeWindowChecker, replicaId, database, client) tokenizableServices += it.client
runOnStop += it::dispose
}
} }
} }
else -> { else -> {

View File

@ -6,6 +6,7 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.core.DeclaredField.Companion.declaredField
import net.corda.core.ErrorOr import net.corda.core.ErrorOr
import net.corda.core.abbreviate import net.corda.core.abbreviate
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
@ -40,11 +41,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override val flowInitiator: FlowInitiator) : Fiber<Unit>(id.toString(), scheduler), FlowStateMachine<R> { override val flowInitiator: FlowInitiator) : Fiber<Unit>(id.toString(), scheduler), FlowStateMachine<R> {
companion object { companion object {
// Used to work around a small limitation in Quasar. // Used to work around a small limitation in Quasar.
private val QUASAR_UNBLOCKER = run { private val QUASAR_UNBLOCKER = declaredField<Any>(Fiber::class, "SERIALIZER_BLOCKER").value
val field = Fiber::class.java.getDeclaredField("SERIALIZER_BLOCKER")
field.isAccessible = true
field.get(null)
}
/** /**
* Return the current [FlowStateMachineImpl] or null if executing outside of one. * Return the current [FlowStateMachineImpl] or null if executing outside of one.

View File

@ -1,8 +1,10 @@
package net.corda.node.services.transactions package net.corda.node.services.transactions
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.DigitalSignature
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.getOrThrow
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
@ -14,7 +16,6 @@ import net.corda.core.utilities.unwrap
import net.corda.flows.NotaryException import net.corda.flows.NotaryException
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Database
import java.nio.file.Path
import kotlin.concurrent.thread import kotlin.concurrent.thread
/** /**
@ -25,18 +26,27 @@ import kotlin.concurrent.thread
class BFTNonValidatingNotaryService(config: BFTSMaRtConfig, class BFTNonValidatingNotaryService(config: BFTSMaRtConfig,
services: ServiceHubInternal, services: ServiceHubInternal,
timeWindowChecker: TimeWindowChecker, timeWindowChecker: TimeWindowChecker,
serverId: Int, replicaId: Int,
db: Database, db: Database) : NotaryService {
private val client: BFTSMaRt.Client) : NotaryService { val client = BFTSMaRt.Client(config, replicaId) // (Ab)use replicaId for clientId.
private val replicaHolder = SettableFuture.create<Replica>()
init { init {
// Replica startup must be in parallel with other replicas, otherwise the constructor may not return:
val configHandle = config.handle() val configHandle = config.handle()
thread(name = "BFTSmartServer-$serverId", isDaemon = true) { thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) {
configHandle.use { configHandle.use {
Server(configHandle.path, serverId, db, "bft_smart_notary_committed_states", services, timeWindowChecker) replicaHolder.set(Replica(it, replicaId, db, "bft_smart_notary_committed_states", services, timeWindowChecker))
log.info("BFT SMaRt replica $replicaId is running.")
} }
} }
} }
fun dispose() {
replicaHolder.getOrThrow().dispose()
client.dispose()
}
companion object { companion object {
val type = SimpleNotaryService.type.getSubType("bft") val type = SimpleNotaryService.type.getSubType("bft")
private val log = loggerFor<BFTNonValidatingNotaryService>() private val log = loggerFor<BFTNonValidatingNotaryService>()
@ -67,12 +77,12 @@ class BFTNonValidatingNotaryService(config: BFTSMaRtConfig,
} }
} }
private class Server(configHome: Path, private class Replica(config: BFTSMaRtConfig,
id: Int, replicaId: Int,
db: Database, db: Database,
tableName: String, tableName: String,
services: ServiceHubInternal, services: ServiceHubInternal,
timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Server(configHome, id, db, tableName, services, timeWindowChecker) { timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, db, tableName, services, timeWindowChecker) {
override fun executeCommand(command: ByteArray): ByteArray { override fun executeCommand(command: ByteArray): ByteArray {
val request = command.deserialize<BFTSMaRt.CommitRequest>() val request = command.deserialize<BFTSMaRt.CommitRequest>()

View File

@ -1,12 +1,17 @@
package net.corda.node.services.transactions package net.corda.node.services.transactions
import bftsmart.communication.ServerCommunicationSystem
import bftsmart.communication.client.netty.NettyClientServerCommunicationSystemClientSide
import bftsmart.communication.client.netty.NettyClientServerSession
import bftsmart.tom.MessageContext import bftsmart.tom.MessageContext
import bftsmart.tom.ServiceProxy import bftsmart.tom.ServiceProxy
import bftsmart.tom.ServiceReplica import bftsmart.tom.ServiceReplica
import bftsmart.tom.core.TOMLayer
import bftsmart.tom.core.messages.TOMMessage import bftsmart.tom.core.messages.TOMMessage
import bftsmart.tom.server.defaultservices.DefaultRecoverable import bftsmart.tom.server.defaultservices.DefaultRecoverable
import bftsmart.tom.server.defaultservices.DefaultReplier import bftsmart.tom.server.defaultservices.DefaultReplier
import bftsmart.tom.util.Extractor import bftsmart.tom.util.Extractor
import net.corda.core.DeclaredField.Companion.declaredField
import net.corda.core.contracts.StateRef import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.DigitalSignature
@ -20,6 +25,7 @@ import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.toTypedArray
import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.FilteredTransaction
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
@ -28,7 +34,7 @@ import net.corda.flows.NotaryError
import net.corda.flows.NotaryException import net.corda.flows.NotaryException
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.BFTSMaRt.Client import net.corda.node.services.transactions.BFTSMaRt.Client
import net.corda.node.services.transactions.BFTSMaRt.Server import net.corda.node.services.transactions.BFTSMaRt.Replica
import net.corda.node.utilities.JDBCHashMap import net.corda.node.utilities.JDBCHashMap
import net.corda.node.utilities.transaction import net.corda.node.utilities.transaction
import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Database
@ -37,10 +43,9 @@ import java.util.*
/** /**
* Implements a replicated transaction commit log based on the [BFT-SMaRt](https://github.com/bft-smart/library) * Implements a replicated transaction commit log based on the [BFT-SMaRt](https://github.com/bft-smart/library)
* consensus algorithm. Every replica in the cluster is running a [Server] maintaining the state, and a[Client] is used * consensus algorithm. Every replica in the cluster is running a [Replica] maintaining the state, and a [Client] is used
* to to relay state modification requests to all [Server]s. * to relay state modification requests to all [Replica]s.
*/ */
// TODO: Write bft-smart host config file based on Corda node configuration.
// TODO: Define and document the configuration of the bft-smart cluster. // TODO: Define and document the configuration of the bft-smart cluster.
// TODO: Potentially update the bft-smart API for our use case or rebuild client and server from lower level building // TODO: Potentially update the bft-smart API for our use case or rebuild client and server from lower level building
// blocks bft-smart provides. // blocks bft-smart provides.
@ -49,18 +54,18 @@ import java.util.*
// consensus about membership changes). Nodes that join the cluster for the first time or re-join can go through // consensus about membership changes). Nodes that join the cluster for the first time or re-join can go through
// a "recovering" state and request missing data from their peers. // a "recovering" state and request missing data from their peers.
object BFTSMaRt { object BFTSMaRt {
/** Sent from [Client] to [Server]. */ /** Sent from [Client] to [Replica]. */
@CordaSerializable @CordaSerializable
data class CommitRequest(val tx: Any, val callerIdentity: Party) data class CommitRequest(val tx: Any, val callerIdentity: Party)
/** Sent from [Server] to [Client]. */ /** Sent from [Replica] to [Client]. */
@CordaSerializable @CordaSerializable
sealed class ReplicaResponse { sealed class ReplicaResponse {
data class Error(val error: NotaryError) : ReplicaResponse() data class Error(val error: NotaryError) : ReplicaResponse()
data class Signature(val txSignature: DigitalSignature) : ReplicaResponse() data class Signature(val txSignature: DigitalSignature) : ReplicaResponse()
} }
/** An aggregate response from all replica ([Server]) replies sent from [Client] back to the calling application. */ /** An aggregate response from all replica ([Replica]) replies sent from [Client] back to the calling application. */
@CordaSerializable @CordaSerializable
sealed class ClusterResponse { sealed class ClusterResponse {
data class Error(val error: NotaryError) : ClusterResponse() data class Error(val error: NotaryError) : ClusterResponse()
@ -68,15 +73,26 @@ object BFTSMaRt {
} }
class Client(config: BFTSMaRtConfig, private val clientId: Int) : SingletonSerializeAsToken() { class Client(config: BFTSMaRtConfig, private val clientId: Int) : SingletonSerializeAsToken() {
private val configHandle = config.handle()
companion object { companion object {
private val log = loggerFor<Client>() private val log = loggerFor<Client>()
} }
/** A proxy for communicating with the BFT cluster */ /** A proxy for communicating with the BFT cluster */
private val proxy: ServiceProxy by lazy { private val proxy = ServiceProxy(clientId, config.path.toString(), buildResponseComparator(), buildExtractor())
configHandle.use { buildProxy(it.path) } private val sessionTable = (proxy.communicationSystem as NettyClientServerCommunicationSystemClientSide).declaredField<Map<Int, NettyClientServerSession>>("sessionTable").value
fun dispose() {
proxy.close() // XXX: Does this do enough?
}
private fun awaitClientConnectionToCluster() {
// TODO: Hopefully we only need to wait for the client's initial connection to the cluster, and this method can be moved to some startup code.
while (true) {
val inactive = sessionTable.entries.mapNotNull { if (it.value.channel.isActive) null else it.key }
if (inactive.isEmpty()) break
log.info("Client-replica channels not yet active: $clientId to $inactive")
Thread.sleep((inactive.size * 100).toLong())
}
} }
/** /**
@ -85,16 +101,10 @@ object BFTSMaRt {
*/ */
fun commitTransaction(transaction: Any, otherSide: Party): ClusterResponse { fun commitTransaction(transaction: Any, otherSide: Party): ClusterResponse {
require(transaction is FilteredTransaction || transaction is SignedTransaction) { "Unsupported transaction type: ${transaction.javaClass.name}" } require(transaction is FilteredTransaction || transaction is SignedTransaction) { "Unsupported transaction type: ${transaction.javaClass.name}" }
val request = CommitRequest(transaction, otherSide) awaitClientConnectionToCluster()
val responseBytes = proxy.invokeOrdered(request.serialize().bytes) val requestBytes = CommitRequest(transaction, otherSide).serialize().bytes
val response = responseBytes.deserialize<ClusterResponse>() val responseBytes = proxy.invokeOrdered(requestBytes)
return response return responseBytes.deserialize<ClusterResponse>()
}
private fun buildProxy(configHome: Path): ServiceProxy {
val comparator = buildResponseComparator()
val extractor = buildExtractor()
return ServiceProxy(clientId, configHome.toString(), comparator, extractor)
} }
/** A comparator to check if replies from two replicas are the same. */ /** A comparator to check if replies from two replicas are the same. */
@ -136,29 +146,46 @@ object BFTSMaRt {
} }
} }
/** ServiceReplica doesn't have any kind of shutdown method, so we add one in this subclass. */
private class CordaServiceReplica(replicaId: Int, configHome: Path, owner: DefaultRecoverable) : ServiceReplica(replicaId, configHome.toString(), owner, owner, null, DefaultReplier()) {
private val tomLayerField = declaredField<TOMLayer>(ServiceReplica::class, "tomLayer")
private val csField = declaredField<ServerCommunicationSystem>(ServiceReplica::class, "cs")
fun dispose() {
// Half of what restart does:
val tomLayer = tomLayerField.value
tomLayer.shutdown() // Non-blocking.
val cs = csField.value
cs.join()
cs.serversConn.join()
tomLayer.join()
tomLayer.deliveryThread.join()
// TODO: At the cluster level, join all Sender/Receiver threads.
}
}
/** /**
* Maintains the commit log and executes commit commands received from the [Client]. * Maintains the commit log and executes commit commands received from the [Client].
* *
* The validation logic can be specified by implementing the [executeCommand] method. * The validation logic can be specified by implementing the [executeCommand] method.
*/ */
@Suppress("LeakingThis") abstract class Replica(config: BFTSMaRtConfig,
abstract class Server(configHome: Path, replicaId: Int,
val replicaId: Int, private val db: Database,
val db: Database,
tableName: String, tableName: String,
val services: ServiceHubInternal, private val services: ServiceHubInternal,
val timeWindowChecker: TimeWindowChecker) : DefaultRecoverable() { private val timeWindowChecker: TimeWindowChecker) : DefaultRecoverable() {
companion object { companion object {
private val log = loggerFor<Server>() private val log = loggerFor<Replica>()
} }
// TODO: Use Requery with proper DB schema instead of JDBCHashMap. // TODO: Use Requery with proper DB schema instead of JDBCHashMap.
// Must be initialised before ServiceReplica is started // Must be initialised before ServiceReplica is started
val commitLog = db.transaction { JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(tableName) } private val commitLog = db.transaction { JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(tableName) }
@Suppress("LeakingThis")
private val replica = CordaServiceReplica(replicaId, config.path, this)
init { fun dispose() {
// TODO: Looks like this statement is blocking. Investigate the bft-smart node startup. replica.dispose()
ServiceReplica(replicaId, configHome.toString(), this, this, null, DefaultReplier())
} }
override fun appExecuteUnordered(command: ByteArray, msgCtx: MessageContext): ByteArray? { override fun appExecuteUnordered(command: ByteArray, msgCtx: MessageContext): ByteArray? {
@ -166,10 +193,7 @@ object BFTSMaRt {
} }
override fun appExecuteBatch(command: Array<ByteArray>, mcs: Array<MessageContext>): Array<ByteArray?> { override fun appExecuteBatch(command: Array<ByteArray>, mcs: Array<MessageContext>): Array<ByteArray?> {
val replies = command.zip(mcs) { c, _ -> return Arrays.stream(command).map(this::executeCommand).toTypedArray()
executeCommand(c)
}
return replies.toTypedArray()
} }
/** /**

View File

@ -12,27 +12,27 @@ import java.nio.file.Files
* Each instance of this class creates such a configHome, accessible via [path]. * Each instance of this class creates such a configHome, accessible via [path].
* The files are deleted on [close] typically via [use], see [PathManager] for details. * The files are deleted on [close] typically via [use], see [PathManager] for details.
*/ */
class BFTSMaRtConfig(replicaAddresses: List<HostAndPort>) : PathManager<BFTSMaRtConfig>(Files.createTempDirectory("bft-smart-config")) { class BFTSMaRtConfig(private val replicaAddresses: List<HostAndPort>, debug: Boolean = false) : PathManager<BFTSMaRtConfig>(Files.createTempDirectory("bft-smart-config")) {
companion object { companion object {
internal val portIsClaimedFormat = "Port %s is claimed by another replica: %s" internal val portIsClaimedFormat = "Port %s is claimed by another replica: %s"
} }
init { init {
val claimedPorts = mutableSetOf<Int>() val claimedPorts = mutableSetOf<HostAndPort>()
replicaAddresses.map { it.port }.forEach { base -> val n = replicaAddresses.size
(0 until n).forEach { replicaId ->
// Each replica claims the configured port and the next one: // Each replica claims the configured port and the next one:
(0..1).map { base + it }.forEach { port -> replicaPorts(replicaId).forEach { port ->
claimedPorts.add(port) || throw IllegalArgumentException(portIsClaimedFormat.format(port, claimedPorts)) claimedPorts.add(port) || throw IllegalArgumentException(portIsClaimedFormat.format(port, claimedPorts))
} }
} }
configWriter("hosts.config") { configWriter("hosts.config") {
replicaAddresses.forEachIndexed { index, address -> replicaAddresses.forEachIndexed { index, address ->
// The documentation strongly recommends IP addresses: // The documentation strongly recommends IP addresses:
println("${index} ${InetAddress.getByName(address.host).hostAddress} ${address.port}") println("$index ${InetAddress.getByName(address.host).hostAddress} ${address.port}")
} }
} }
val n = replicaAddresses.size val systemConfig = String.format(javaClass.getResource("system.config.printf").readText(), n, maxFaultyReplicas(n), if (debug) 1 else 0, (0 until n).joinToString(","))
val systemConfig = String.format(javaClass.getResource("system.config.printf").readText(), n, maxFaultyReplicas(n))
configWriter("system.config") { configWriter("system.config") {
print(systemConfig) print(systemConfig)
} }
@ -46,6 +46,11 @@ class BFTSMaRtConfig(replicaAddresses: List<HostAndPort>) : PathManager<BFTSMaRt
} }
} }
} }
private fun replicaPorts(replicaId: Int): List<HostAndPort> {
val base = replicaAddresses[replicaId]
return (0..1).map { HostAndPort.fromParts(base.host, base.port + it) }
}
} }
fun maxFaultyReplicas(clusterSize: Int) = (clusterSize - 1) / 3 fun maxFaultyReplicas(clusterSize: Int) = (clusterSize - 1) / 3

View File

@ -32,9 +32,8 @@ object ServiceIdentityGenerator {
serviceCa: CertificateAndKeyPair, serviceCa: CertificateAndKeyPair,
serviceId: String, serviceId: String,
serviceName: X500Name, serviceName: X500Name,
threshold: Int = 1) { threshold: Int = 1): PartyAndCertificate {
log.trace { "Generating a group identity \"serviceName\" for nodes: ${dirs.joinToString()}" } log.trace { "Generating a group identity \"serviceName\" for nodes: ${dirs.joinToString()}" }
val keyPairs = (1..dirs.size).map { generateKeyPair() } val keyPairs = (1..dirs.size).map { generateKeyPair() }
val notaryKey = CompositeKey.Builder().addKeys(keyPairs.map { it.public }).build(threshold) val notaryKey = CompositeKey.Builder().addKeys(keyPairs.map { it.public }).build(threshold)
// TODO: This doesn't work until we have composite keys in X.509 certificates, so we make up a certificate that nothing checks // TODO: This doesn't work until we have composite keys in X.509 certificates, so we make up a certificate that nothing checks
@ -42,15 +41,16 @@ object ServiceIdentityGenerator {
// serviceCa.keyPair, serviceName, notaryKey) // serviceCa.keyPair, serviceName, notaryKey)
val notaryCert = X509Utilities.createSelfSignedCACertificate(serviceName, generateKeyPair()) val notaryCert = X509Utilities.createSelfSignedCACertificate(serviceName, generateKeyPair())
val notaryCertPath = X509Utilities.createCertificatePath(serviceCa.certificate, notaryCert, revocationEnabled = false) val notaryCertPath = X509Utilities.createCertificatePath(serviceCa.certificate, notaryCert, revocationEnabled = false)
val notaryParty = PartyAndCertificate(serviceName, notaryKey, notaryCert, notaryCertPath).serialize() val notaryParty = PartyAndCertificate(serviceName, notaryKey, notaryCert, notaryCertPath)
val notaryPartyBytes = notaryParty.serialize()
keyPairs.zip(dirs) { keyPair, dir ->
Files.createDirectories(dir)
val privateKeyFile = "$serviceId-private-key" val privateKeyFile = "$serviceId-private-key"
val publicKeyFile = "$serviceId-public" val publicKeyFile = "$serviceId-public"
notaryParty.writeToFile(dir.resolve(publicKeyFile)) keyPairs.zip(dirs) { keyPair, dir ->
Files.createDirectories(dir)
notaryPartyBytes.writeToFile(dir.resolve(publicKeyFile))
// Use storageKryo as our whitelist is not available in the gradle build environment: // Use storageKryo as our whitelist is not available in the gradle build environment:
keyPair.serialize(storageKryo()).writeToFile(dir.resolve(privateKeyFile)) keyPair.serialize(storageKryo()).writeToFile(dir.resolve(privateKeyFile))
} }
return notaryParty
} }
} }

View File

@ -65,7 +65,7 @@ system.communication.useSignatures = 0
system.communication.useMACs = 1 system.communication.useMACs = 1
#Set to 1 if SMaRt should use the standard output to display debug messages, set to 0 if otherwise #Set to 1 if SMaRt should use the standard output to display debug messages, set to 0 if otherwise
system.debug = 0 system.debug = %s
#Print information about the replica when it is shutdown #Print information about the replica when it is shutdown
system.shutdownhook = true system.shutdownhook = true
@ -109,7 +109,7 @@ system.totalordermulticast.sync_ckp = false
#Replicas ID for the initial view, separated by a comma. #Replicas ID for the initial view, separated by a comma.
# The number of replicas in this parameter should be equal to that specified in 'system.servers.num' # The number of replicas in this parameter should be equal to that specified in 'system.servers.num'
system.initial.view = 0,1,2,3 system.initial.view = %s
#The ID of the trust third party (TTP) #The ID of the trust third party (TTP)
system.ttp.id = 7002 system.ttp.id = 7002

View File

@ -2,9 +2,9 @@ package net.corda.node.services.transactions
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import net.corda.node.services.transactions.BFTSMaRtConfig.Companion.portIsClaimedFormat import net.corda.node.services.transactions.BFTSMaRtConfig.Companion.portIsClaimedFormat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test import org.junit.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class BFTSMaRtConfigTests { class BFTSMaRtConfigTests {
@Test @Test
@ -29,12 +29,12 @@ class BFTSMaRtConfigTests {
@Test @Test
fun `overlapping port ranges are rejected`() { fun `overlapping port ranges are rejected`() {
fun addresses(vararg ports: Int) = ports.map { HostAndPort.fromParts("localhost", it) } fun addresses(vararg ports: Int) = ports.map { HostAndPort.fromParts("localhost", it) }
assertFailsWith(IllegalArgumentException::class, portIsClaimedFormat.format(11001, setOf(11000, 11001))) { assertThatThrownBy { BFTSMaRtConfig(addresses(11000, 11001)).use {} }
BFTSMaRtConfig(addresses(11000, 11001)).use {} .isInstanceOf(IllegalArgumentException::class.java)
} .hasMessage(portIsClaimedFormat.format("localhost:11001", setOf("localhost:11000", "localhost:11001")))
assertFailsWith(IllegalArgumentException::class, portIsClaimedFormat.format(11001, setOf(11001, 11002))) { assertThatThrownBy { BFTSMaRtConfig(addresses(11001, 11000)).use {} }
BFTSMaRtConfig(addresses(11001, 11000)).use {} .isInstanceOf(IllegalArgumentException::class.java)
} .hasMessage(portIsClaimedFormat.format("localhost:11001", setOf("localhost:11001", "localhost:11002", "localhost:11000")))
BFTSMaRtConfig(addresses(11000, 11002)).use {} // Non-overlapping. BFTSMaRtConfig(addresses(11000, 11002)).use {} // Non-overlapping.
} }
} }

View File

@ -2,6 +2,7 @@ package net.corda.testing.node
import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors.listeningDecorator
import net.corda.core.* import net.corda.core.*
import net.corda.core.crypto.X509Utilities import net.corda.core.crypto.X509Utilities
import net.corda.core.crypto.appendToCommonName import net.corda.core.crypto.appendToCommonName
@ -58,8 +59,8 @@ abstract class NodeBasedTest {
*/ */
@After @After
fun stopAllNodes() { fun stopAllNodes() {
val shutdownExecutor = Executors.newScheduledThreadPool(1) val shutdownExecutor = listeningDecorator(Executors.newScheduledThreadPool(nodes.size))
nodes.forEach(Node::stop) Futures.allAsList(nodes.map { shutdownExecutor.submit(it::stop) }).getOrThrow()
// Wait until ports are released // Wait until ports are released
val portNotBoundChecks = nodes.flatMap { val portNotBoundChecks = nodes.flatMap {
listOf( listOf(