BFT notary prototype: add validation and signature collection (#279)

* BFT notary prototype: add a non-validating service.
Each replica now validates the transaction timestamp and returns an individual signature to the BFT client. The client then returns a list of signatures back to the notary service flow.

The validating variant is still incomplete - it requires the ability to suspend flows on arbitrary function calls.
This commit is contained in:
Andrius Dagys 2017-03-07 12:39:19 +00:00 committed by GitHub
parent 0a5080a4e4
commit 907a893ca1
15 changed files with 438 additions and 308 deletions

View File

@ -19,7 +19,7 @@ class NonValidatingNotaryFlow(otherSide: Party,
* undo the commit of the input states (the exact mechanism still needs to be worked out).
*/
@Suspendable
override fun receiveAndVerifyTx(): NotaryFlow.Service.TransactionParts {
override fun receiveAndVerifyTx(): TransactionParts {
val ftx = receive<FilteredTransaction>(otherSide).unwrap {
it.verify()
it

View File

@ -109,12 +109,6 @@ object NotaryFlow {
@Suspendable
abstract fun receiveAndVerifyTx(): TransactionParts
/**
* The minimum amount of information needed to notarise a transaction. Note that this does not include
* any sensitive transaction details.
*/
data class TransactionParts(val id: SecureHash, val inputs: List<StateRef>, val timestamp: Timestamp?)
@Suspendable
private fun signAndSendResponse(txId: SecureHash) {
val signature = sign(txId.bytes)
@ -158,6 +152,12 @@ object NotaryFlow {
}
}
/**
* The minimum amount of information needed to notarise a transaction. Note that this does not include
* any sensitive transaction details.
*/
data class TransactionParts(val id: SecureHash, val inputs: List<StateRef>, val timestamp: Timestamp?)
class NotaryException(val error: NotaryError) : FlowException() {
override fun toString() = "${super.toString()}: Error response from Notary - $error"
}

View File

@ -0,0 +1,100 @@
package net.corda.node.services
import net.corda.core.contracts.DummyContract
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionType
import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import net.corda.flows.NotaryFlow
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.Node
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.node.NodeBasedTest
import org.junit.Test
import java.security.KeyPair
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class BFTNotaryServiceTests : NodeBasedTest() {
private val notaryName = "BFT Notary Server"
@Test
fun `detect double spend`() {
val masterNode = startBFTNotaryCluster(notaryName, 4, BFTNonValidatingNotaryService.type).first()
val alice = startNode("Alice").getOrThrow()
val notaryParty = alice.netMapCache.getNotary(notaryName)!!
val notaryNodeKeyPair = databaseTransaction(masterNode.database) { masterNode.services.notaryIdentityKey }
val aliceKey = databaseTransaction(alice.database) { alice.services.legalIdentityKey }
val inputState = issueState(alice, notaryParty, notaryNodeKeyPair)
val firstSpendTx = TransactionType.General.Builder(notaryParty).withItems(inputState).run {
signWith(aliceKey)
toSignedTransaction(false)
}
val firstSpend = alice.services.startFlow(NotaryFlow.Client(firstSpendTx))
firstSpend.resultFuture.getOrThrow()
val secondSpendTx = TransactionType.General.Builder(notaryParty).withItems(inputState).run {
val dummyState = DummyContract.SingleOwnerState(0, alice.info.legalIdentity.owningKey)
addOutputState(dummyState)
signWith(aliceKey)
toSignedTransaction(false)
}
val secondSpend = alice.services.startFlow(NotaryFlow.Client(secondSpendTx))
val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() }
val error = ex.error as NotaryError.Conflict
assertEquals(error.txId, secondSpendTx.id)
}
private fun issueState(node: AbstractNode, notary: Party, notaryKey: KeyPair): StateAndRef<*> {
return databaseTransaction(node.database) {
val tx = DummyContract.generateInitial(Random().nextInt(), notary, node.info.legalIdentity.ref(0))
tx.signWith(node.services.legalIdentityKey)
tx.signWith(notaryKey)
val stx = tx.toSignedTransaction()
node.services.recordTransactions(listOf(stx))
StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0))
}
}
private fun startBFTNotaryCluster(notaryName: String,
clusterSize: Int,
serviceType: ServiceType): List<Node> {
val quorum = (2 * clusterSize + 1) / 3
ServiceIdentityGenerator.generateToDisk(
(0 until clusterSize).map { tempFolder.root.toPath() / "$notaryName-$it" },
serviceType.id,
notaryName,
quorum)
val serviceInfo = ServiceInfo(serviceType, notaryName)
val masterNode = startNode(
"$notaryName-0",
advertisedServices = setOf(serviceInfo),
configOverrides = mapOf("notaryNodeId" to 0)
).getOrThrow()
val remainingNodes = (1 until clusterSize).map {
startNode(
"$notaryName-$it",
advertisedServices = setOf(serviceInfo),
configOverrides = mapOf("notaryNodeId" to it)
).getOrThrow()
}
return remainingNodes + masterNode
}
}

View File

@ -25,6 +25,7 @@ import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.*
import net.corda.node.services.api.*
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.events.NodeSchedulerService
@ -440,6 +441,12 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
SimpleNotaryService.type -> SimpleNotaryService(services, timestampChecker, uniquenessProvider)
ValidatingNotaryService.type -> ValidatingNotaryService(services, timestampChecker, uniquenessProvider)
RaftValidatingNotaryService.type -> RaftValidatingNotaryService(services, timestampChecker, uniquenessProvider as RaftUniquenessProvider)
BFTNonValidatingNotaryService.type -> with(configuration as FullNodeConfiguration) {
val nodeId = notaryNodeId ?: throw IllegalArgumentException("notaryNodeId value must be specified in the configuration")
val client = BFTSMaRt.Client(nodeId)
tokenizableServices.add(client)
BFTNonValidatingNotaryService(services, timestampChecker, nodeId, database, client)
}
else -> {
throw IllegalArgumentException("Notary type ${type.id} is not handled by makeNotaryService.")
}

View File

@ -25,11 +25,7 @@ import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.services.transactions.BFTSmartUniquenessProvider
import net.corda.node.services.transactions.BFTValidatingNotaryService
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.databaseTransaction
import org.jetbrains.exposed.sql.Database
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.nio.channels.FileLock
@ -164,9 +160,6 @@ class Node(override val configuration: FullNodeConfiguration,
RaftValidatingNotaryService.type -> with(configuration) {
RaftUniquenessProvider(baseDirectory, notaryNodeAddress!!, notaryClusterAddresses, database, configuration)
}
BFTValidatingNotaryService.type -> with(configuration) {
BFTSmartUniquenessProvider(notaryNodeAddress!!, notaryClusterAddresses, database)
}
else -> PersistentUniquenessProvider()
}
}

View File

@ -47,11 +47,12 @@ object ConfigHelper {
}
}
@Suppress("UNCHECKED_CAST")
@Suppress("UNCHECKED_CAST", "PLATFORM_CLASS_MAPPED_TO_KOTLIN")
operator fun <T> Config.getValue(receiver: Any, metadata: KProperty<*>): T {
return when (metadata.returnType.javaType) {
String::class.java -> getString(metadata.name) as T
Int::class.java -> getInt(metadata.name) as T
Integer::class.java -> getInt(metadata.name) as T
Long::class.java -> getLong(metadata.name) as T
Double::class.java -> getDouble(metadata.name) as T
Boolean::class.java -> getBoolean(metadata.name) as T

View File

@ -71,6 +71,7 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config
val messagingServerAddress: HostAndPort? by config.getOrElse { null }
val extraAdvertisedServiceIds: List<String> = config.getListOrElse<String>("extraAdvertisedServiceIds") { emptyList() }
val useTestClock: Boolean by config.getOrElse { false }
val notaryNodeId: Int? by config.getOrElse { null }
val notaryNodeAddress: HostAndPort? by config.getOrElse { null }
val notaryClusterAddresses: List<HostAndPort> = config
.getListOrElse<String>("notaryClusterAddresses") { emptyList() }

View File

@ -0,0 +1,93 @@
package net.corda.node.services.transactions
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.node.services.TimestampChecker
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.unwrap
import net.corda.flows.NotaryException
import net.corda.node.services.api.ServiceHubInternal
import org.jetbrains.exposed.sql.Database
import kotlin.concurrent.thread
/**
* A non-validating notary service operated by a group of parties that don't necessarily trust each other.
*
* A transaction is notarised when the consensus is reached by the cluster on its uniqueness, and timestamp validity.
*/
class BFTNonValidatingNotaryService(services: ServiceHubInternal,
timestampChecker: TimestampChecker,
serverId: Int,
db: Database,
val client: BFTSMaRt.Client) : NotaryService(services) {
init {
thread(name = "BFTSmartServer-$serverId", isDaemon = true) {
Server(serverId, db, "bft_smart_notary_committed_states", services, timestampChecker)
}
}
companion object {
val type = SimpleNotaryService.type.getSubType("bft")
private val log = loggerFor<BFTNonValidatingNotaryService>()
}
override fun createFlow(otherParty: Party) = ServiceFlow(otherParty, client)
class ServiceFlow(val otherSide: Party, val client: BFTSMaRt.Client) : FlowLogic<Void?>() {
@Suspendable
override fun call(): Void? {
val stx = receive<FilteredTransaction>(otherSide).unwrap { it }
val signatures = commit(stx)
send(otherSide, signatures)
return null
}
private fun commit(stx: FilteredTransaction): List<DigitalSignature> {
val response = client.commitTransaction(stx, otherSide)
when (response) {
is BFTSMaRt.ClusterResponse.Error -> throw NotaryException(response.error)
is BFTSMaRt.ClusterResponse.Signatures -> {
log.debug("All input states of transaction ${stx.rootHash} have been committed")
return response.txSignatures
}
}
}
}
class Server(id: Int,
db: Database,
tableName: String,
services: ServiceHubInternal,
timestampChecker: TimestampChecker) : BFTSMaRt.Server(id, db, tableName, services, timestampChecker) {
override fun executeCommand(command: ByteArray): ByteArray {
val request = command.deserialize<BFTSMaRt.CommitRequest>()
val ftx = request.tx as FilteredTransaction
val response = verifyAndCommitTx(ftx, request.callerIdentity)
return response.serialize().bytes
}
fun verifyAndCommitTx(ftx: FilteredTransaction, callerIdentity: Party): BFTSMaRt.ReplicaResponse {
return try {
val id = ftx.rootHash
val inputs = ftx.filteredLeaves.inputs
validateTimestamp(ftx.filteredLeaves.timestamp)
commitInputStates(inputs, id, callerIdentity)
log.debug { "Inputs committed successfully, signing $id" }
val sig = sign(id.bytes)
BFTSMaRt.ReplicaResponse.Signature(sig)
} catch (e: NotaryException) {
log.debug { "Error processing transaction: ${e.error}" }
BFTSMaRt.ReplicaResponse.Error(e.error)
}
}
}
}

View File

@ -0,0 +1,224 @@
package net.corda.node.services.transactions
import bftsmart.tom.MessageContext
import bftsmart.tom.ServiceProxy
import bftsmart.tom.ServiceReplica
import bftsmart.tom.core.messages.TOMMessage
import bftsmart.tom.server.defaultservices.DefaultRecoverable
import bftsmart.tom.server.defaultservices.DefaultReplier
import bftsmart.tom.util.Extractor
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.Timestamp
import net.corda.core.crypto.*
import net.corda.core.node.services.TimestampChecker
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.BFTSMaRt.Client
import net.corda.node.services.transactions.BFTSMaRt.Server
import net.corda.node.utilities.JDBCHashMap
import net.corda.node.utilities.databaseTransaction
import org.jetbrains.exposed.sql.Database
import java.util.*
/**
* Implements a replicated transaction commit log based on the [BFT-SMaRt](https://github.com/bft-smart/library)
* consensus algorithm. Every replica in the cluster is running a [Server] maintaining the state, and a[Client] is used
* to to relay state modification requests to all [Server]s.
*/
// TODO: Write bft-smart host config file based on Corda node configuration.
// TODO: Define and document the configuration of the bft-smart cluster.
// TODO: Potentially update the bft-smart API for our use case or rebuild client and server from lower level building
// blocks bft-smart provides.
// TODO: Support cluster membership changes. This requires reading about reconfiguration of bft-smart clusters and
// perhaps a design doc. In general, it seems possible to use the state machine to reconfigure the cluster (reaching
// consensus about membership changes). Nodes that join the cluster for the first time or re-join can go through
// a "recovering" state and request missing data from their peers.
object BFTSMaRt {
/** Sent from [Client] to [Server]. */
@CordaSerializable
data class CommitRequest(val tx: Any, val callerIdentity: Party)
/** Sent from [Server] to [Client]. */
@CordaSerializable
sealed class ReplicaResponse {
class Error(val error: NotaryError) : ReplicaResponse()
class Signature(val txSignature: DigitalSignature) : ReplicaResponse()
}
/** An aggregate response from all replica ([Server]) replies sent from [Client] back to the calling application. */
@CordaSerializable
sealed class ClusterResponse {
class Error(val error: NotaryError) : ClusterResponse()
class Signatures(val txSignatures: List<DigitalSignature>) : ClusterResponse()
}
class Client(val id: Int) : SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<Client>()
}
/** A proxy for communicating with the BFT cluster */
private val proxy: ServiceProxy by lazy { buildProxy() }
/**
* Sends a transaction commit request to the BFT cluster. The [proxy] will deliver the request to every
* replica, and block until a sufficient number of replies are received.
*/
fun commitTransaction(transaction: Any, otherSide: Party): ClusterResponse {
require(transaction is FilteredTransaction || transaction is SignedTransaction) { "Unsupported transaction type: ${transaction.javaClass.name}" }
val request = CommitRequest(transaction, otherSide)
val responseBytes = proxy.invokeOrdered(request.serialize().bytes)
val response = responseBytes.deserialize<ClusterResponse>()
return response
}
private fun buildProxy(): ServiceProxy {
val comparator = buildResponseComparator()
val extractor = buildExtractor()
return ServiceProxy(id, "bft-smart-config", comparator, extractor)
}
/** A comparator to check if replies from two replicas are the same. */
private fun buildResponseComparator(): Comparator<ByteArray> {
return Comparator { o1, o2 ->
val reply1 = o1.deserialize<ReplicaResponse>()
val reply2 = o2.deserialize<ReplicaResponse>()
if (reply1 is ReplicaResponse.Error && reply2 is ReplicaResponse.Error) {
// TODO: for now we treat all errors as equal, compare by error type as well
0
} else if (reply1 is ReplicaResponse.Signature && reply2 is ReplicaResponse.Signature) 0 else -1
}
}
/** An extractor to build the final response message for the client application from all received replica replies. */
private fun buildExtractor(): Extractor {
return Extractor { replies, sameContent, lastReceived ->
val responses = replies.mapNotNull { it?.content?.deserialize<ReplicaResponse>() }
val accepted = responses.filterIsInstance<ReplicaResponse.Signature>()
val rejected = responses.filterIsInstance<ReplicaResponse.Error>()
log.debug { "BFT Client $id: number of replicas accepted the commit: ${accepted.size}, rejected: ${rejected.size}" }
// TODO: only return an aggregate if the majority of signatures are replies
// TODO: return an error reported by the majority and not just the first one
val aggregateResponse = if (accepted.isNotEmpty()) {
log.debug { "Cluster response - signatures: ${accepted.map { it.txSignature }}" }
ClusterResponse.Signatures(accepted.map { it.txSignature })
} else {
log.debug { "Cluster response - error: ${rejected.first().error}" }
ClusterResponse.Error(rejected.first().error)
}
val messageContent = aggregateResponse.serialize().bytes
// TODO: is it safe use the last message for sender/session/sequence info
val reply = replies[lastReceived]
TOMMessage(reply.sender, reply.session, reply.sequence, messageContent, reply.viewID)
}
}
}
/**
* Maintains the commit log and executes commit commands received from the [Client].
*
* The validation logic can be specified by implementing the [executeCommand] method.
*/
@Suppress("LeakingThis")
abstract class Server(val id: Int,
val db: Database,
tableName: String,
val services: ServiceHubInternal,
val timestampChecker: TimestampChecker) : DefaultRecoverable() {
companion object {
private val log = loggerFor<Server>()
}
// TODO: Use Requery with proper DB schema instead of JDBCHashMap.
// Must be initialised before ServiceReplica is started
val commitLog = databaseTransaction(db) { JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(tableName) }
init {
// TODO: Looks like this statement is blocking. Investigate the bft-smart node startup.
ServiceReplica(id, "bft-smart-config", this, this, null, DefaultReplier())
}
override fun appExecuteUnordered(command: ByteArray, msgCtx: MessageContext): ByteArray? {
throw NotImplementedError("No unordered operations supported")
}
override fun appExecuteBatch(command: Array<ByteArray>, mcs: Array<MessageContext>): Array<ByteArray?> {
val replies = command.zip(mcs) { c, m ->
executeCommand(c)
}
return replies.toTypedArray()
}
/**
* Implement logic to execute the command and commit the transaction to the log.
* Helper methods are provided for transaction processing: [commitInputStates], [validateTimestamp], and [sign].
*/
abstract fun executeCommand(command: ByteArray): ByteArray?
protected fun commitInputStates(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
log.debug { "Attempting to commit inputs for transaction: $txId" }
val conflicts = mutableMapOf<StateRef, UniquenessProvider.ConsumingTx>()
databaseTransaction(db) {
states.forEach { state ->
commitLog[state]?.let { conflicts[state] = it }
}
if (conflicts.isEmpty()) {
log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" }
states.forEachIndexed { i, stateRef ->
val txInfo = UniquenessProvider.ConsumingTx(txId, i, callerIdentity)
commitLog.put(stateRef, txInfo)
}
} else {
log.debug { "Conflict detected the following inputs have already been committed: ${conflicts.keys.joinToString()}" }
val conflict = UniquenessProvider.Conflict(conflicts)
val conflictData = conflict.serialize()
val signedConflict = SignedData(conflictData, sign(conflictData.bytes))
throw NotaryException(NotaryError.Conflict(txId, signedConflict))
}
}
}
protected fun validateTimestamp(t: Timestamp?) {
if (t != null && !timestampChecker.isValid(t))
throw NotaryException(NotaryError.TimestampInvalid())
}
protected fun sign(bytes: ByteArray): DigitalSignature.WithKey {
val mySigningKey = databaseTransaction(db) { services.notaryIdentityKey }
return mySigningKey.signWithECDSA(bytes)
}
// TODO:
// - Test snapshot functionality with different bft-smart cluster configurations.
// - Add streaming to support large data sets.
override fun getSnapshot(): ByteArray {
// LinkedHashMap for deterministic serialisation
val m = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
databaseTransaction(db) {
commitLog.forEach { m[it.key] = it.value }
}
return m.serialize().bytes
}
override fun installSnapshot(bytes: ByteArray) {
val m = bytes.deserialize<LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>>()
databaseTransaction(db) {
commitLog.clear()
commitLog.putAll(m)
}
}
}
}

View File

@ -1,77 +0,0 @@
package net.corda.node.services.transactions
import com.google.common.net.HostAndPort
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.UniquenessException
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.utilities.loggerFor
import org.jetbrains.exposed.sql.Database
import kotlin.concurrent.thread
/**
* A [UniquenessProvider] based on the [bft-smart library](https://github.com/bft-smart/library).
*
* Experimental, not ready for production yet.
*
* A [BFTSmartUniquenessProvider] starts a [BFTSmartServer] that joins the notary cluster and stores committed input
* states and a [BFTSmartClient] to commit states to the notary cluster.
*
* @param clusterAddresses the addresses of all BFTSmartUniquenessProviders of the notary cluster
* @param myAddress the address of this uniqueness provider, must be listed in clusterAddresses
*/
class BFTSmartUniquenessProvider(val myAddress: HostAndPort, val clusterAddresses: List<HostAndPort>, val db: Database) : UniquenessProvider {
// TODO: Write bft-smart host config file based on Corda node configuration.
// TODO: Define and document the configuration of the bft-smart cluster.
// TODO: Potentially update the bft-smart API for our use case or rebuild client and server from lower level building
// blocks bft-smart provides.
// TODO: Support cluster membership changes. This requires reading about reconfiguration of bft-smart clusters and
// perhaps a design doc. In general, it seems possible to use the state machine to reconfigure the cluster (reaching
// consensus about membership changes). Nodes that join the cluster for the first time or re-join can go through
// a "recovering" state and request missing data from their peers.
init {
require(myAddress in clusterAddresses) {
"expected myAddress '$myAddress' to be listed in clusterAddresses '$clusterAddresses'"
}
startServerThread()
}
companion object {
private val log = loggerFor<BFTSmartUniquenessProvider>()
}
private val bftClient = BFTSmartClient<StateRef, UniquenessProvider.ConsumingTx>(clientId())
/** Throws UniquenessException if conflict is detected */
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
val entries = states.mapIndexed { i, stateRef ->
stateRef to UniquenessProvider.ConsumingTx(txId, i, callerIdentity)
}.toMap()
val conflicts = bftClient.put(entries)
if (conflicts.isNotEmpty()) {
throw UniquenessException(UniquenessProvider.Conflict(conflicts))
}
log.debug("All input states of transaction $txId have been committed")
}
private fun serverId(): Int {
return clusterAddresses.indexOf(myAddress)
}
private fun clientId(): Int {
// 10k IDs are reserved for servers.
require(clusterAddresses.size <= 10000)
return 10000 + serverId()
}
private fun startServerThread() {
val id = serverId()
thread(name="BFTSmartServer-$id", isDaemon=true) {
BFTSmartServer<StateRef, UniquenessProvider.ConsumingTx>(id, db, "bft_smart_notary_committed_states")
}
}
}

View File

@ -1,26 +0,0 @@
package net.corda.node.services.transactions
import net.corda.core.crypto.Party
import net.corda.core.node.services.TimestampChecker
import net.corda.flows.ValidatingNotaryFlow
import net.corda.node.services.api.ServiceHubInternal
/**
* A validating notary service operated by a group of parties that don't necessarily trust each other.
*
* To validate a transaction, this service collects proofs that the transaction has been validated and committed by a
* specified number of notary nodes.
*
* Based on the [bft-smart library](https://github.com/bft-smart/library).
*/
class BFTValidatingNotaryService(services: ServiceHubInternal,
val timestampChecker: TimestampChecker,
val uniquenessProvider: BFTSmartUniquenessProvider) : NotaryService(services) {
companion object {
val type = ValidatingNotaryService.type.getSubType("bft")
}
override fun createFlow(otherParty: Party): ValidatingNotaryFlow {
return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
}
}

View File

@ -1,120 +0,0 @@
package net.corda.node.services.transactions
import bftsmart.tom.MessageContext
import bftsmart.tom.ServiceProxy
import bftsmart.tom.ServiceReplica
import bftsmart.tom.server.defaultservices.DefaultRecoverable
import bftsmart.tom.server.defaultservices.DefaultReplier
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.node.utilities.JDBCHashMap
import net.corda.node.utilities.databaseTransaction
import org.jetbrains.exposed.sql.Database
import java.util.*
@CordaSerializable
enum class RequestType {
Get,
Put
}
/** Sent from [BFTSmartClient] to [BFTSmartServer] */
@CordaSerializable
data class Request(val type: RequestType, val data: Any)
class BFTSmartClient<K: Any, V: Any>(id: Int) {
val clientProxy = ServiceProxy(id, "bft-smart-config")
/**
* Returns conflicts as a map of conflicting keys and their stored values or an empty map if all entries are
* committed without conflicts.
*/
fun put(entries: Map<K, V>): Map<K, V> {
val request = Request(RequestType.Put, entries)
val responseBytes = clientProxy.invokeOrdered(request.serialize().bytes)
return responseBytes.deserialize<Map<K, V>>()
}
/** Returns the value associated with the key or null if no value is stored under the key. */
fun get(key: K): V? {
val request = Request(RequestType.Get, key)
val responseBytes = clientProxy.invokeUnordered(request.serialize().bytes) ?: return null
return responseBytes.deserialize<V>()
}
}
class BFTSmartServer<K: Any, V: Any>(val id: Int, val db: Database, tableName: String) : DefaultRecoverable() {
// TODO: Exception handling when processing client input.
// TODO: Use Requery with proper DB schema instead of JDBCHashMap.
val table = databaseTransaction(db) { JDBCHashMap<K, V>(tableName) }
// TODO: Looks like this statement is blocking. Investigate the bft-smart node startup.
val replica = ServiceReplica(id, "bft-smart-config", this, this, null, DefaultReplier())
@Suppress("UNUSED_PARAMETER")
override fun appExecuteUnordered(command: ByteArray, msgCtx: MessageContext): ByteArray? {
// TODO: collect signatures from msgCtx
val request = command.deserialize<Request>()
when (request.type) {
RequestType.Get -> {
val v = databaseTransaction(db) { table[request.data] } ?: return null
return v.serialize().bytes
}
else -> {
throw Exception("Unhandled request type: ${request.type}")
}
}
}
override fun appExecuteBatch(command: Array<ByteArray>, mcs: Array<MessageContext>): Array<ByteArray?> {
val replies = command.zip(mcs) { c, m ->
executeSingle(c, m)
}
return replies.toTypedArray()
}
@Suppress("UNUSED_PARAMETER")
private fun executeSingle(command: ByteArray, msgCtx: MessageContext): ByteArray? {
// TODO: collect signatures from msgCtx
val request = command.deserialize<Request>()
val conflicts = mutableMapOf<K, V>()
when (request.type) {
RequestType.Put -> {
@Suppress("UNCHECKED_CAST")
val m = request.data as Map<K, V>
databaseTransaction(db) {
for (k in m.keys) table[k]?.let { conflicts[k] = it }
if (conflicts.isEmpty()) table.putAll(m)
}
}
else -> {
throw Exception("Unhandled request type: ${request.type}")
}
}
return conflicts.serialize().bytes
}
// TODO:
// - Test snapshot functionality with different bft-smart cluster configurations.
// - Add streaming to support large data sets.
override fun getSnapshot(): ByteArray {
// LinkedHashMap for deterministic serialisation
// TODO: Simply use an array of pairs.
val m = LinkedHashMap<K, V>()
databaseTransaction(db) {
table.forEach { m[it.key] = it.value }
}
return m.serialize().bytes
}
override fun installSnapshot(bytes: ByteArray) {
val m = bytes.deserialize<LinkedHashMap<K, V>>()
databaseTransaction(db) {
table.clear()
table.putAll(m)
}
}
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services.transactions
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.flows.NotaryFlow
import net.corda.node.services.api.ServiceHubInternal
@ -21,6 +22,6 @@ abstract class NotaryService(services: ServiceHubInternal) : SingletonSerializeA
}
/** Implement a factory that specifies the transaction commit flow for the notary service to use */
abstract fun createFlow(otherParty: Party): NotaryFlow.Service
abstract fun createFlow(otherParty: Party): FlowLogic<Void?>
}

View File

@ -45,7 +45,8 @@ fun main(args: Array<String>) {
val dirs = args[0].split(",").map { Paths.get(it) }
val serviceId = args[1]
val serviceName = args[2]
val quorumSize = args.getOrNull(3)?.toInt() ?: 1
println("Generating service identity for \"$serviceName\"")
ServiceIdentityGenerator.generateToDisk(dirs, serviceId, serviceName)
ServiceIdentityGenerator.generateToDisk(dirs, serviceId, serviceName, quorumSize)
}

View File

@ -1,68 +0,0 @@
package net.corda.node.services
import net.corda.node.services.transactions.BFTSmartClient
import net.corda.node.services.transactions.BFTSmartServer
import net.corda.node.utilities.configureDatabase
import net.corda.testing.node.makeTestDataSourceProperties
import org.jetbrains.exposed.sql.Database
import org.junit.After
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import java.io.Closeable
import kotlin.concurrent.thread
import kotlin.test.assertEquals
class DistributedImmutableBFTMapTests {
// TODO: Setup Corda cluster instead of starting server threads (see DistributedImmutableMapTests).
lateinit var dataSource: Closeable
lateinit var database: Database
@Before
fun setup() {
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
dataSource = dataSourceAndDatabase.first
database = dataSourceAndDatabase.second
}
@After
fun tearDown() {
dataSource.close()
}
@Test @Ignore
fun `stores entries correctly and detects conflicts`() {
(0..3).forEach { i ->
thread { BFTSmartServer<String, String>(i, database, "bft_notary_committed_states_$i") }.apply { Thread.sleep(500) }
}
Thread.sleep(3000) // TODO: Get notified when the servers are ready.
val client = BFTSmartClient<String, String>(1001)
val m = mapOf("a" to "b", "c" to "d", "e" to "f")
val conflicts = client.put(m)
assertEquals(mapOf(), conflicts)
for ((k, v) in m) {
val r = client.get(k)
assertEquals(v, r)
}
val conflicts2 = client.put(mapOf("a" to "b2"))
assertEquals(mapOf("a" to "b"), conflicts2)
// Values are not mutated.
for ((k, v) in m) {
val r = client.get(k)
assertEquals(v, r)
}
// Null response encodes 'not found'.
val r = client.get("x")
assertEquals(null, r)
}
}