BFT notary demo (#725)

* Rename raft-notary-demo project to notary-demo
* Refactor serialisation filtering to allow BFT SMaRt to work, it no longer relies on the jdk.serialFilter system property
* In NodeBasedTest remove whitespace in node directory names for consistency with cordform and driver
This commit is contained in:
Andrzej Cichocki
2017-05-24 12:25:06 +01:00
committed by GitHub
parent 375392d32d
commit bbe4c170c2
46 changed files with 636 additions and 319 deletions

View File

@ -53,7 +53,6 @@ class BootTests {
class ObjectInputStreamFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
System.clearProperty("jdk.serialFilter") // This checks that the node has already consumed the property.
val data = ByteArrayOutputStream().apply { ObjectOutputStream(this).use { it.writeObject(object : Serializable {}) } }.toByteArray()
ObjectInputStream(data.inputStream()).use { it.readObject() }
}

View File

@ -5,19 +5,19 @@ import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionType
import net.corda.core.crypto.appendToCommonName
import net.corda.core.crypto.commonName
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.ALICE
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import net.corda.flows.NotaryFlow
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.Node
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.services.transactions.minCorrectReplicas
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.node.utilities.transaction
import net.corda.testing.node.NodeBasedTest
@ -28,71 +28,55 @@ import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class BFTNotaryServiceTests : NodeBasedTest() {
private companion object {
val notaryCommonName = X500Name("CN=BFT Notary Server,O=R3,OU=corda,L=Zurich,C=CH")
fun buildNodeName(it: Int, notaryName: X500Name): X500Name {
return notaryName.appendToCommonName("-$it")
}
}
@Test
fun `detect double spend`() {
val masterNode = startBFTNotaryCluster(notaryCommonName, 4, BFTNonValidatingNotaryService.type).first()
val clusterName = X500Name("CN=BFT,O=R3,OU=corda,L=Zurich,C=CH")
startBFTNotaryCluster(clusterName, 4, BFTNonValidatingNotaryService.type)
val alice = startNode(ALICE.name).getOrThrow()
val notaryParty = alice.netMapCache.getNotary(notaryCommonName)!!
val notaryParty = alice.netMapCache.getNotary(clusterName)!!
val inputState = issueState(alice, notaryParty)
val firstTxBuilder = TransactionType.General.Builder(notaryParty).withItems(inputState)
val firstSpendTx = alice.services.signInitialTransaction(firstTxBuilder)
val firstSpend = alice.services.startFlow(NotaryFlow.Client(firstSpendTx))
firstSpend.resultFuture.getOrThrow()
val secondSpendBuilder = TransactionType.General.Builder(notaryParty).withItems(inputState).run {
val dummyState = DummyContract.SingleOwnerState(0, alice.info.legalIdentity)
addOutputState(dummyState)
this
alice.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture.getOrThrow()
val secondSpendBuilder = TransactionType.General.Builder(notaryParty).withItems(inputState).also {
it.addOutputState(DummyContract.SingleOwnerState(0, alice.info.legalIdentity))
}
val secondSpendTx = alice.services.signInitialTransaction(secondSpendBuilder)
val secondSpend = alice.services.startFlow(NotaryFlow.Client(secondSpendTx))
val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() }
val ex = assertFailsWith(NotaryException::class) {
secondSpend.resultFuture.getOrThrow()
}
val error = ex.error as NotaryError.Conflict
assertEquals(error.txId, secondSpendTx.id)
}
private fun issueState(node: AbstractNode, notary: Party): StateAndRef<*> {
return node.database.transaction {
val builder = DummyContract.generateInitial(Random().nextInt(), notary, node.info.legalIdentity.ref(0))
val stx = node.services.signInitialTransaction(builder)
node.services.recordTransactions(listOf(stx))
private fun issueState(node: AbstractNode, notary: Party) = node.run {
database.transaction {
val builder = DummyContract.generateInitial(Random().nextInt(), notary, info.legalIdentity.ref(0))
val stx = services.signInitialTransaction(builder)
services.recordTransactions(listOf(stx))
StateAndRef(builder.outputStates().first(), StateRef(stx.id, 0))
}
}
private fun startBFTNotaryCluster(notaryName: X500Name,
private fun startBFTNotaryCluster(clusterName: X500Name,
clusterSize: Int,
serviceType: ServiceType): List<Node> {
serviceType: ServiceType) {
require(clusterSize > 0)
val quorum = (2 * clusterSize + 1) / 3
val replicaNames = (0 until clusterSize).map { DUMMY_NOTARY.name.appendToCommonName(" $it") }
ServiceIdentityGenerator.generateToDisk(
(0 until clusterSize).map { tempFolder.root.toPath() / "${notaryName.commonName}-$it" },
replicaNames.map { baseDirectory(it) },
serviceType.id,
notaryName,
quorum)
val serviceInfo = ServiceInfo(serviceType, notaryName)
val nodes = (0 until clusterSize).map {
clusterName,
minCorrectReplicas(clusterSize))
val serviceInfo = ServiceInfo(serviceType, clusterName)
val notaryClusterAddresses = (0 until clusterSize).map { "localhost:${11000 + it * 10}" }
(0 until clusterSize).forEach {
startNode(
buildNodeName(it, notaryName),
replicaNames[it],
advertisedServices = setOf(serviceInfo),
configOverrides = mapOf("notaryNodeId" to it)
configOverrides = mapOf("bftReplicaId" to it, "notaryClusterAddresses" to notaryClusterAddresses)
).getOrThrow()
}
return nodes
}
}

View File

@ -3,8 +3,6 @@ package net.corda.services.messaging
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.*
import net.corda.core.crypto.X509Utilities
import net.corda.core.crypto.commonName
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.DEFAULT_SESSION_ID
@ -64,10 +62,8 @@ class P2PMessagingTest : NodeBasedTest() {
// TODO Use a dummy distributed service
@Test
fun `communicating with a distributed service which the network map node is part of`() {
val root = tempFolder.root.toPath()
ServiceIdentityGenerator.generateToDisk(
listOf(root / DUMMY_MAP.name.commonName, root / SERVICE_2_NAME.commonName),
listOf(DUMMY_MAP.name, SERVICE_2_NAME).map { baseDirectory(it) },
RaftValidatingNotaryService.type.id,
DISTRIBUTED_SERVICE_NAME)

View File

@ -2,8 +2,6 @@ package net.corda.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.crypto.X509Utilities
import net.corda.core.crypto.commonName
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.NodeInfo
@ -60,7 +58,7 @@ class P2PSecurityTest : NodeBasedTest() {
private fun startSimpleNode(legalName: X500Name): SimpleNode {
val config = TestNodeConfiguration(
baseDirectory = tempFolder.root.toPath() / legalName.commonName,
baseDirectory = baseDirectory(legalName),
myLegalName = legalName,
networkMapService = NetworkMapInfo(networkMapNode.configuration.p2pAddress, networkMapNode.info.legalIdentity.name))
config.configureWithDevSSLCertificate() // This creates the node's TLS cert with the CN as the legal name

View File

@ -10,10 +10,10 @@ import net.corda.core.crypto.commonName
import net.corda.core.crypto.orgName
import net.corda.core.node.VersionInfo
import net.corda.core.utilities.Emoji
import net.corda.core.utilities.LogHelper.withLevel
import net.corda.node.internal.Node
import net.corda.node.internal.enforceSingleNodeIsRunning
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.transactions.bftSMaRtSerialFilter
import net.corda.node.shell.InteractiveShell
import net.corda.node.utilities.registration.HTTPNetworkRegistrationService
import net.corda.node.utilities.registration.NetworkRegistrationHelper
@ -21,7 +21,6 @@ import org.fusesource.jansi.Ansi
import org.fusesource.jansi.AnsiConsole
import org.slf4j.LoggerFactory
import org.slf4j.bridge.SLF4JBridgeHandler
import java.io.*
import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.nio.file.Paths
@ -72,8 +71,6 @@ fun main(args: Array<String>) {
enforceSingleNodeIsRunning(cmdlineOptions.baseDirectory)
initLogging(cmdlineOptions)
disableJavaDeserialization() // Should be after initLogging to avoid TMI.
// Manifest properties are only available if running from the corda jar
fun manifestValue(name: String): String? = if (Manifests.exists(name)) Manifests.read(name) else null
@ -107,7 +104,7 @@ fun main(args: Array<String>) {
println("Unable to load the configuration file: ${e.rootCause.message}")
exitProcess(2)
}
SerialFilter.install(if (conf.bftReplicaId != null) ::bftSMaRtSerialFilter else ::defaultSerialFilter)
if (cmdlineOptions.isRegistration) {
println()
println("******************************************************************")
@ -208,29 +205,12 @@ private fun assertCanNormalizeEmptyPath() {
}
}
private fun failStartUp(message: String): Nothing {
internal fun failStartUp(message: String): Nothing {
println(message)
println("Corda will now exit...")
exitProcess(1)
}
private fun disableJavaDeserialization() {
// ObjectInputFilter and friends are in java.io in Java 9 but sun.misc in backports, so we are using the system property interface for portability.
// This property has already been set in the Capsule. Anywhere else may be too late, but we'll repeat it here for developers.
System.setProperty("jdk.serialFilter", "maxbytes=0")
// Attempt at deserialization so that ObjectInputFilter (permanently) inits itself:
val data = ByteArrayOutputStream().apply { ObjectOutputStream(this).use { it.writeObject(object : Serializable {}) } }.toByteArray()
try {
withLevel("java.io.serialization", "WARN") {
ObjectInputStream(data.inputStream()).use { it.readObject() } // Logs REJECTED at INFO, which we don't want users to see.
}
// JDK 8u121 is the earliest JDK8 JVM that supports this functionality.
failStartUp("Corda forbids Java deserialisation. Please upgrade to at least JDK 8u121 and set system property 'jdk.serialFilter' to 'maxbytes=0' when booting Corda.")
} catch (e: InvalidClassException) {
// Good, our system property is honoured.
}
}
private fun printPluginsAndServices(node: Node) {
node.configuration.extraAdvertisedServiceIds.let {
if (it.isNotEmpty()) printBasicNodeInfo("Providing network services", it.joinToString())

View File

@ -0,0 +1,62 @@
package net.corda.node
import java.lang.reflect.Field
import java.lang.reflect.Method
import java.lang.reflect.Proxy
internal object SerialFilter {
private val filterInterface: Class<*>
private val serialClassGetter: Method
private val undecided: Any
private val rejected: Any
private val serialFilterLock: Any
private val serialFilterField: Field
init {
// ObjectInputFilter and friends are in java.io in Java 9 but sun.misc in backports:
fun getFilterInterface(packageName: String): Class<*>? {
return try {
Class.forName("$packageName.ObjectInputFilter")
} catch (e: ClassNotFoundException) {
null
}
}
// JDK 8u121 is the earliest JDK8 JVM that supports this functionality.
filterInterface = getFilterInterface("java.io")
?: getFilterInterface("sun.misc")
?: failStartUp("Corda forbids Java deserialisation. Please upgrade to at least JDK 8u121.")
serialClassGetter = Class.forName("${filterInterface.name}\$FilterInfo").getMethod("serialClass")
val statusEnum = Class.forName("${filterInterface.name}\$Status")
undecided = statusEnum.getField("UNDECIDED").get(null)
rejected = statusEnum.getField("REJECTED").get(null)
val configClass = Class.forName("${filterInterface.name}\$Config")
serialFilterLock = configClass.getDeclaredField("serialFilterLock").also { it.isAccessible = true }.get(null)
serialFilterField = configClass.getDeclaredField("serialFilter").also { it.isAccessible = true }
}
internal fun install(acceptClass: (Class<*>) -> Boolean) {
val filter = Proxy.newProxyInstance(javaClass.classLoader, arrayOf(filterInterface)) { _, _, args ->
val serialClass = serialClassGetter.invoke(args[0]) as Class<*>?
if (applyPredicate(acceptClass, serialClass)) {
undecided
} else {
rejected
}
}
// Can't simply use the setter as in non-trampoline mode Capsule has inited the filter in premain:
synchronized(serialFilterLock) {
serialFilterField.set(null, filter)
}
}
internal fun applyPredicate(acceptClass: (Class<*>) -> Boolean, serialClass: Class<*>?): Boolean {
// Similar logic to jdk.serialFilter, our concern is side-effects at deserialisation time:
if (null == serialClass) return true
var componentType: Class<*> = serialClass
while (componentType.isArray) componentType = componentType.componentType
if (componentType.isPrimitive) return true
return acceptClass(componentType)
}
}
internal fun defaultSerialFilter(@Suppress("UNUSED_PARAMETER") clazz: Class<*>) = false

View File

@ -32,6 +32,8 @@ import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.config.parseAs
import net.corda.cordform.CordformNode
import net.corda.cordform.CordformContext
import net.corda.core.internal.ShutdownHook
import net.corda.core.internal.addShutdownHook
import okhttp3.OkHttpClient
import okhttp3.Request
import org.bouncycastle.asn1.x500.X500Name
@ -236,22 +238,19 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
coerce: (D) -> DI,
dsl: DI.() -> A
): A {
var shutdownHook: Thread? = null
var shutdownHook: ShutdownHook? = null
try {
driverDsl.start()
shutdownHook = Thread({
shutdownHook = addShutdownHook {
driverDsl.shutdown()
})
Runtime.getRuntime().addShutdownHook(shutdownHook)
}
return dsl(coerce(driverDsl))
} catch (exception: Throwable) {
log.error("Driver shutting down because of exception", exception)
throw exception
} finally {
driverDsl.shutdown()
if (shutdownHook != null) {
Runtime.getRuntime().removeShutdownHook(shutdownHook)
}
shutdownHook?.cancel()
}
}
@ -558,21 +557,19 @@ class DriverDSL(
verifierType: VerifierType,
rpcUsers: List<User>
): ListenableFuture<Pair<Party, List<NodeHandle>>> {
val nodeNames = (1..clusterSize).map { DUMMY_NOTARY.name.appendToCommonName(it.toString()) }
val nodeNames = (0 until clusterSize).map { DUMMY_NOTARY.name.appendToCommonName(" $it") }
val paths = nodeNames.map { baseDirectory(it) }
ServiceIdentityGenerator.generateToDisk(paths, type.id, notaryName)
val serviceInfo = ServiceInfo(type, notaryName)
val advertisedService = setOf(serviceInfo)
val advertisedServices = setOf(ServiceInfo(type, notaryName))
val notaryClusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster
val firstNotaryFuture = startNode(nodeNames.first(), advertisedService, rpcUsers, verifierType, mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
val firstNotaryFuture = startNode(nodeNames.first(), advertisedServices, rpcUsers, verifierType, mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
// All other nodes will join the cluster
val restNotaryFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort()
val configOverride = mapOf("notaryNodeAddress" to nodeAddress.toString(), "notaryClusterAddresses" to listOf(notaryClusterAddress.toString()))
startNode(it, advertisedService, rpcUsers, verifierType, configOverride)
startNode(it, advertisedServices, rpcUsers, verifierType, configOverride)
}
return firstNotaryFuture.flatMap { firstNotary ->

View File

@ -61,6 +61,7 @@ import org.jetbrains.exposed.sql.Database
import org.slf4j.Logger
import java.io.IOException
import java.lang.reflect.Modifier.*
import java.net.InetAddress
import java.net.URL
import java.nio.file.FileAlreadyExistsException
import java.nio.file.Path
@ -518,10 +519,11 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(timestampChecker, uniquenessProvider as RaftUniquenessProvider)
RaftValidatingNotaryService.type -> RaftValidatingNotaryService(timestampChecker, uniquenessProvider as RaftUniquenessProvider)
BFTNonValidatingNotaryService.type -> with(configuration as FullNodeConfiguration) {
val nodeId = notaryNodeId ?: throw IllegalArgumentException("notaryNodeId value must be specified in the configuration")
val client = BFTSMaRt.Client(nodeId)
tokenizableServices += client
BFTNonValidatingNotaryService(services, timestampChecker, nodeId, database, client)
val replicaId = bftReplicaId ?: throw IllegalArgumentException("bftReplicaId value must be specified in the configuration")
BFTSMaRtConfig(notaryClusterAddresses).use { config ->
val client = BFTSMaRt.Client(config, replicaId).also { tokenizableServices += it } // (Ab)use replicaId for clientId.
BFTNonValidatingNotaryService(config, services, timestampChecker, replicaId, database, client)
}
}
else -> {
throw IllegalArgumentException("Notary type ${type.id} is not handled by makeNotaryService.")

View File

@ -1,7 +1,7 @@
package net.corda.node.internal
import net.corda.core.internal.addShutdownHook
import net.corda.core.div
import net.corda.core.utilities.loggerFor
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.nio.file.Path
@ -26,9 +26,9 @@ fun enforceSingleNodeIsRunning(baseDirectory: Path) {
}
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
// when our process shuts down, but we try in stop() anyway just to be nice.
Runtime.getRuntime().addShutdownHook(Thread {
addShutdownHook {
pidFileLock.release()
})
}
val ourProcessID: String = ManagementFactory.getRuntimeMXBean().name.split("@")[0]
pidFileRw.setLength(0)
pidFileRw.write(ourProcessID.toByteArray())

View File

@ -5,16 +5,15 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.flatMap
import net.corda.core.*
import net.corda.core.internal.ShutdownHook
import net.corda.core.internal.addShutdownHook
import net.corda.core.messaging.RPCOps
import net.corda.core.minutes
import net.corda.core.node.ServiceHub
import net.corda.core.node.VersionInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.seconds
import net.corda.core.success
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.printBasicNodeInfo
@ -47,7 +46,6 @@ import java.io.IOException
import java.time.Clock
import java.util.*
import javax.management.ObjectName
import kotlin.concurrent.thread
/**
* A Node manages a standalone server that takes part in the P2P network. It creates the services found in [ServiceHub],
@ -112,7 +110,7 @@ class Node(override val configuration: FullNodeConfiguration,
var messageBroker: ArtemisMessagingServer? = null
private var shutdownThread: Thread? = null
private var shutdownHook: ShutdownHook? = null
private lateinit var userService: RPCUserService
@ -295,12 +293,9 @@ class Node(override val configuration: FullNodeConfiguration,
(startupComplete as SettableFuture<Unit>).set(Unit)
}
shutdownThread = thread(start = false) {
shutdownHook = addShutdownHook {
stop()
}
Runtime.getRuntime().addShutdownHook(shutdownThread)
return this
}
@ -322,12 +317,9 @@ class Node(override val configuration: FullNodeConfiguration,
synchronized(this) {
if (shutdown) return
shutdown = true
// Unregister shutdown hook to prevent any unnecessary second calls to stop
if ((shutdownThread != null) && (Thread.currentThread() != shutdownThread)) {
Runtime.getRuntime().removeShutdownHook(shutdownThread)
shutdownThread = null
}
shutdownHook?.cancel()
shutdownHook = null
}
printBasicNodeInfo("Shutting down ...")

View File

@ -62,7 +62,7 @@ data class FullNodeConfiguration(
// Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one
val messagingServerAddress: HostAndPort?,
val extraAdvertisedServiceIds: List<String>,
val notaryNodeId: Int?,
val bftReplicaId: Int?,
val notaryNodeAddress: HostAndPort?,
val notaryClusterAddresses: List<HostAndPort>,
override val certificateChainCheckPolicies: List<CertChainPolicyConfig>,

View File

@ -14,6 +14,7 @@ import net.corda.core.utilities.unwrap
import net.corda.flows.NotaryException
import net.corda.node.services.api.ServiceHubInternal
import org.jetbrains.exposed.sql.Database
import java.nio.file.Path
import kotlin.concurrent.thread
/**
@ -21,14 +22,18 @@ import kotlin.concurrent.thread
*
* A transaction is notarised when the consensus is reached by the cluster on its uniqueness, and timestamp validity.
*/
class BFTNonValidatingNotaryService(services: ServiceHubInternal,
class BFTNonValidatingNotaryService(config: BFTSMaRtConfig,
services: ServiceHubInternal,
timestampChecker: TimestampChecker,
serverId: Int,
db: Database,
val client: BFTSMaRt.Client) : NotaryService {
private val client: BFTSMaRt.Client) : NotaryService {
init {
val configHandle = config.handle()
thread(name = "BFTSmartServer-$serverId", isDaemon = true) {
Server(serverId, db, "bft_smart_notary_committed_states", services, timestampChecker)
configHandle.use {
Server(configHandle.path, serverId, db, "bft_smart_notary_committed_states", services, timestampChecker)
}
}
}
@ -62,11 +67,12 @@ class BFTNonValidatingNotaryService(services: ServiceHubInternal,
}
}
private class Server(id: Int,
private class Server(configHome: Path,
id: Int,
db: Database,
tableName: String,
services: ServiceHubInternal,
timestampChecker: TimestampChecker) : BFTSMaRt.Server(id, db, tableName, services, timestampChecker) {
timestampChecker: TimestampChecker) : BFTSMaRt.Server(configHome, id, db, tableName, services, timestampChecker) {
override fun executeCommand(command: ByteArray): ByteArray {
val request = command.deserialize<BFTSMaRt.CommitRequest>()

View File

@ -32,6 +32,7 @@ import net.corda.node.services.transactions.BFTSMaRt.Server
import net.corda.node.utilities.JDBCHashMap
import net.corda.node.utilities.transaction
import org.jetbrains.exposed.sql.Database
import java.nio.file.Path
import java.util.*
/**
@ -66,13 +67,17 @@ object BFTSMaRt {
data class Signatures(val txSignatures: List<DigitalSignature>) : ClusterResponse()
}
class Client(val id: Int) : SingletonSerializeAsToken() {
class Client(config: BFTSMaRtConfig, private val clientId: Int) : SingletonSerializeAsToken() {
private val configHandle = config.handle()
companion object {
private val log = loggerFor<Client>()
}
/** A proxy for communicating with the BFT cluster */
private val proxy: ServiceProxy by lazy { buildProxy() }
private val proxy: ServiceProxy by lazy {
configHandle.use { buildProxy(it.path) }
}
/**
* Sends a transaction commit request to the BFT cluster. The [proxy] will deliver the request to every
@ -86,10 +91,10 @@ object BFTSMaRt {
return response
}
private fun buildProxy(): ServiceProxy {
private fun buildProxy(configHome: Path): ServiceProxy {
val comparator = buildResponseComparator()
val extractor = buildExtractor()
return ServiceProxy(id, "bft-smart-config", comparator, extractor)
return ServiceProxy(clientId, configHome.toString(), comparator, extractor)
}
/** A comparator to check if replies from two replicas are the same. */
@ -111,7 +116,7 @@ object BFTSMaRt {
val accepted = responses.filterIsInstance<ReplicaResponse.Signature>()
val rejected = responses.filterIsInstance<ReplicaResponse.Error>()
log.debug { "BFT Client $id: number of replicas accepted the commit: ${accepted.size}, rejected: ${rejected.size}" }
log.debug { "BFT Client $clientId: number of replicas accepted the commit: ${accepted.size}, rejected: ${rejected.size}" }
// TODO: only return an aggregate if the majority of signatures are replies
// TODO: return an error reported by the majority and not just the first one
@ -137,7 +142,8 @@ object BFTSMaRt {
* The validation logic can be specified by implementing the [executeCommand] method.
*/
@Suppress("LeakingThis")
abstract class Server(val id: Int,
abstract class Server(configHome: Path,
val replicaId: Int,
val db: Database,
tableName: String,
val services: ServiceHubInternal,
@ -152,7 +158,7 @@ object BFTSMaRt {
init {
// TODO: Looks like this statement is blocking. Investigate the bft-smart node startup.
ServiceReplica(id, "bft-smart-config", this, this, null, DefaultReplier())
ServiceReplica(replicaId, configHome.toString(), this, this, null, DefaultReplier())
}
override fun appExecuteUnordered(command: ByteArray, msgCtx: MessageContext): ByteArray? {

View File

@ -0,0 +1,61 @@
package net.corda.node.services.transactions
import com.google.common.net.HostAndPort
import net.corda.core.div
import java.io.FileWriter
import java.io.PrintWriter
import java.net.InetAddress
import java.nio.file.Files
/**
* BFT SMaRt can only be configured via files in a configHome directory.
* Each instance of this class creates such a configHome, accessible via [path].
* The files are deleted on [close] typically via [use], see [PathManager] for details.
*/
class BFTSMaRtConfig(replicaAddresses: List<HostAndPort>) : PathManager(Files.createTempDirectory("bft-smart-config")) {
companion object {
internal val portIsClaimedFormat = "Port %s is claimed by another replica: %s"
}
init {
val claimedPorts = mutableSetOf<Int>()
replicaAddresses.map { it.port }.forEach { base ->
// Each replica claims the configured port and the next one:
(0..1).map { base + it }.forEach { port ->
claimedPorts.add(port) || throw IllegalArgumentException(portIsClaimedFormat.format(port, claimedPorts))
}
}
configWriter("hosts.config") {
replicaAddresses.forEachIndexed { index, address ->
// The documentation strongly recommends IP addresses:
println("${index} ${InetAddress.getByName(address.host).hostAddress} ${address.port}")
}
}
val n = replicaAddresses.size
val systemConfig = String.format(javaClass.getResource("system.config.printf").readText(), n, maxFaultyReplicas(n))
configWriter("system.config") {
print(systemConfig)
}
}
private fun configWriter(name: String, block: PrintWriter.() -> Unit) {
// Default charset, consistent with loaders:
FileWriter((path / name).toFile()).use {
PrintWriter(it).use {
it.run(block)
}
}
}
}
fun maxFaultyReplicas(clusterSize: Int) = (clusterSize - 1) / 3
fun minCorrectReplicas(clusterSize: Int) = (2 * clusterSize + 3) / 3
fun minClusterSize(maxFaultyReplicas: Int) = maxFaultyReplicas * 3 + 1
fun bftSMaRtSerialFilter(clazz: Class<*>): Boolean = clazz.name.let {
it.startsWith("bftsmart.")
|| it.startsWith("java.security.")
|| it.startsWith("java.util.")
|| it.startsWith("java.lang.")
|| it.startsWith("java.net.")
}

View File

@ -0,0 +1,43 @@
package net.corda.node.services.transactions
import net.corda.core.internal.addShutdownHook
import java.io.Closeable
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicInteger
internal class DeleteOnExitPath(internal val path: Path) {
private val shutdownHook = addShutdownHook { dispose() }
internal fun dispose() {
path.toFile().deleteRecursively()
shutdownHook.cancel()
}
}
open class PathHandle internal constructor(private val deleteOnExitPath: DeleteOnExitPath, private val handleCounter: AtomicInteger) : Closeable {
val path
get(): Path {
val path = deleteOnExitPath.path
check(handleCounter.get() != 0) { "Defunct path: $path" }
return path
}
init {
handleCounter.incrementAndGet()
}
fun handle() = PathHandle(deleteOnExitPath, handleCounter)
override fun close() {
if (handleCounter.decrementAndGet() == 0) {
deleteOnExitPath.dispose()
}
}
}
/**
* An instance of this class is a handle on a temporary [path].
* If necessary, additional handles on the same path can be created using the [handle] method.
* The path is (recursively) deleted when [close] is called on the last handle, typically at the end of a [use] expression.
* The value of eager cleanup of temporary files is that there are cases when shutdown hooks don't run e.g. SIGKILL.
*/
open class PathManager(path: Path) : PathHandle(DeleteOnExitPath(path), AtomicInteger())

View File

@ -0,0 +1,118 @@
# Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
############################################
####### Communication Configurations #######
############################################
#HMAC algorithm used to authenticate messages between processes (HmacMD5 is the default value)
#This parameter is not currently being used being used
#system.authentication.hmacAlgorithm = HmacSHA1
#Specify if the communication system should use a thread to send data (true or false)
system.communication.useSenderThread = true
#Force all processes to use the same public/private keys pair and secret key. This is useful when deploying experiments
#and benchmarks, but must not be used in production systems.
system.communication.defaultkeys = true
############################################
### Replication Algorithm Configurations ###
############################################
#Number of servers in the group
system.servers.num = %s
#Maximum number of faulty replicas
system.servers.f = %s
#Timeout to asking for a client request
system.totalordermulticast.timeout = 2000
#Maximum batch size (in number of messages)
system.totalordermulticast.maxbatchsize = 400
#Number of nonces (for non-determinism actions) generated
system.totalordermulticast.nonces = 10
#if verification of leader-generated timestamps are increasing
#it can only be used on systems in which the network clocks
#are synchronized
system.totalordermulticast.verifyTimestamps = false
#Quantity of messages that can be stored in the receive queue of the communication system
system.communication.inQueueSize = 500000
# Quantity of messages that can be stored in the send queue of each replica
system.communication.outQueueSize = 500000
#Set to 1 if SMaRt should use signatures, set to 0 if otherwise
system.communication.useSignatures = 0
#Set to 1 if SMaRt should use MAC's, set to 0 if otherwise
system.communication.useMACs = 1
#Set to 1 if SMaRt should use the standard output to display debug messages, set to 0 if otherwise
system.debug = 0
#Print information about the replica when it is shutdown
system.shutdownhook = true
############################################
###### State Transfer Configurations #######
############################################
#Activate the state transfer protocol ('true' to activate, 'false' to de-activate)
system.totalordermulticast.state_transfer = false
#Maximum ahead-of-time message not discarded
system.totalordermulticast.highMark = 10000
#Maximum ahead-of-time message not discarded when the replica is still on EID 0 (after which the state transfer is triggered)
system.totalordermulticast.revival_highMark = 10
#Number of ahead-of-time messages necessary to trigger the state transfer after a request timeout occurs
system.totalordermulticast.timeout_highMark = 200
############################################
###### Log and Checkpoint Configurations ###
############################################
system.totalordermulticast.log = false
system.totalordermulticast.log_parallel = false
system.totalordermulticast.log_to_disk = false
system.totalordermulticast.sync_log = false
#Period at which BFT-SMaRt requests the state to the application (for the state transfer state protocol)
system.totalordermulticast.checkpoint_period = 1
system.totalordermulticast.global_checkpoint_period = 1
system.totalordermulticast.checkpoint_to_disk = false
system.totalordermulticast.sync_ckp = false
############################################
###### Reconfiguration Configurations ######
############################################
#Replicas ID for the initial view, separated by a comma.
# The number of replicas in this parameter should be equal to that specified in 'system.servers.num'
system.initial.view = 0,1,2,3
#The ID of the trust third party (TTP)
system.ttp.id = 7002
#This sets if the system will function in Byzantine or crash-only mode. Set to "true" to support Byzantine faults
system.bft = true

View File

@ -0,0 +1,31 @@
package net.corda.node
import org.junit.Test
import java.io.IOException
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.test.fail
class SerialFilterTests {
@Test
fun `null and primitives are accepted and arrays are unwrapped`() {
val acceptClass = { _: Class<*> -> fail("Should not be invoked.") }
listOf(null, Byte::class.javaPrimitiveType, IntArray::class.java, Array<CharArray>::class.java).forEach {
assertTrue(SerialFilter.applyPredicate(acceptClass, it))
}
}
@Test
fun `the predicate is applied to the componentType`() {
val classes = mutableListOf<Class<*>>()
val acceptClass = { clazz: Class<*> ->
classes.add(clazz)
false
}
listOf(String::class.java, Array<Unit>::class.java, Array<Array<IOException>>::class.java).forEach {
assertFalse(SerialFilter.applyPredicate(acceptClass, it))
}
assertEquals(listOf<Class<*>>(String::class.java, Unit::class.java, IOException::class.java), classes)
}
}

View File

@ -5,6 +5,7 @@ import net.corda.core.crypto.commonName
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.utilities.ALICE
import net.corda.core.utilities.WHITESPACE
import net.corda.testing.node.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
@ -12,11 +13,8 @@ import org.junit.Test
class NodeTest : NodeBasedTest() {
@Test
fun `empty plugins directory`() {
val baseDirectory = tempFolder.root.toPath() / ALICE.name.commonName
val baseDirectory = baseDirectory(ALICE.name)
(baseDirectory / "plugins").createDirectories()
val node = startNode(ALICE.name).getOrThrow()
// Make sure we created the plugins dir in the correct place
assertThat(baseDirectory).isEqualTo(node.configuration.baseDirectory)
startNode(ALICE.name).getOrThrow()
}
}
}

View File

@ -0,0 +1,40 @@
package net.corda.node.services.transactions
import com.google.common.net.HostAndPort
import net.corda.node.services.transactions.BFTSMaRtConfig.Companion.portIsClaimedFormat
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class BFTSMaRtConfigTests {
@Test
fun `replica arithmetic`() {
(1..20).forEach { n ->
assertEquals(n, maxFaultyReplicas(n) + minCorrectReplicas(n))
}
(1..3).forEach { n -> assertEquals(0, maxFaultyReplicas(n)) }
(4..6).forEach { n -> assertEquals(1, maxFaultyReplicas(n)) }
(7..9).forEach { n -> assertEquals(2, maxFaultyReplicas(n)) }
10.let { n -> assertEquals(3, maxFaultyReplicas(n)) }
}
@Test
fun `min cluster size`() {
assertEquals(1, minClusterSize(0))
assertEquals(4, minClusterSize(1))
assertEquals(7, minClusterSize(2))
assertEquals(10, minClusterSize(3))
}
@Test
fun `overlapping port ranges are rejected`() {
fun addresses(vararg ports: Int) = ports.map { HostAndPort.fromParts("localhost", it) }
assertFailsWith(IllegalArgumentException::class, portIsClaimedFormat.format(11001, setOf(11000, 11001))) {
BFTSMaRtConfig(addresses(11000, 11001)).use {}
}
assertFailsWith(IllegalArgumentException::class, portIsClaimedFormat.format(11001, setOf(11001, 11002))) {
BFTSMaRtConfig(addresses(11001, 11000)).use {}
}
BFTSMaRtConfig(addresses(11000, 11002)).use {} // Non-overlapping.
}
}

View File

@ -0,0 +1,32 @@
package net.corda.node.services.transactions
import net.corda.core.exists
import org.junit.Test
import java.nio.file.Files
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class PathManagerTests {
@Test
fun `path deleted when manager closed`() {
val manager = PathManager(Files.createTempFile(javaClass.simpleName, null))
val leakedPath = manager.use {
it.path.also { assertTrue(it.exists()) }
}
assertFalse(leakedPath.exists())
assertFailsWith(IllegalStateException::class) { manager.path }
}
@Test
fun `path deleted when handle closed`() {
val handle = PathManager(Files.createTempFile(javaClass.simpleName, null)).use {
it.handle()
}
val leakedPath = handle.use {
it.path.also { assertTrue(it.exists()) }
}
assertFalse(leakedPath.exists())
assertFailsWith(IllegalStateException::class) { handle.path }
}
}