diff --git a/core/src/main/kotlin/net/corda/flows/NonValidatingNotaryFlow.kt b/core/src/main/kotlin/net/corda/flows/NonValidatingNotaryFlow.kt index ed32c033dc..e9a9f17c2b 100644 --- a/core/src/main/kotlin/net/corda/flows/NonValidatingNotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NonValidatingNotaryFlow.kt @@ -19,7 +19,7 @@ class NonValidatingNotaryFlow(otherSide: Party, * undo the commit of the input states (the exact mechanism still needs to be worked out). */ @Suspendable - override fun receiveAndVerifyTx(): NotaryFlow.Service.TransactionParts { + override fun receiveAndVerifyTx(): TransactionParts { val ftx = receive(otherSide).unwrap { it.verify() it diff --git a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt index 0878dee1b3..18014d3d82 100644 --- a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt @@ -109,12 +109,6 @@ object NotaryFlow { @Suspendable abstract fun receiveAndVerifyTx(): TransactionParts - /** - * The minimum amount of information needed to notarise a transaction. Note that this does not include - * any sensitive transaction details. - */ - data class TransactionParts(val id: SecureHash, val inputs: List, val timestamp: Timestamp?) - @Suspendable private fun signAndSendResponse(txId: SecureHash) { val signature = sign(txId.bytes) @@ -158,6 +152,12 @@ object NotaryFlow { } } +/** + * The minimum amount of information needed to notarise a transaction. Note that this does not include + * any sensitive transaction details. + */ +data class TransactionParts(val id: SecureHash, val inputs: List, val timestamp: Timestamp?) + class NotaryException(val error: NotaryError) : FlowException() { override fun toString() = "${super.toString()}: Error response from Notary - $error" } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt new file mode 100644 index 0000000000..e54c15cc24 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt @@ -0,0 +1,100 @@ +package net.corda.node.services + +import net.corda.core.contracts.DummyContract +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionType +import net.corda.core.crypto.Party +import net.corda.core.div +import net.corda.core.getOrThrow +import net.corda.core.node.services.ServiceInfo +import net.corda.core.node.services.ServiceType +import net.corda.flows.NotaryError +import net.corda.flows.NotaryException +import net.corda.flows.NotaryFlow +import net.corda.node.internal.AbstractNode +import net.corda.node.internal.Node +import net.corda.node.services.transactions.BFTNonValidatingNotaryService +import net.corda.node.utilities.ServiceIdentityGenerator +import net.corda.node.utilities.databaseTransaction +import net.corda.testing.node.NodeBasedTest +import org.junit.Test +import java.security.KeyPair +import java.util.* +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class BFTNotaryServiceTests : NodeBasedTest() { + private val notaryName = "BFT Notary Server" + + @Test + fun `detect double spend`() { + val masterNode = startBFTNotaryCluster(notaryName, 4, BFTNonValidatingNotaryService.type).first() + val alice = startNode("Alice").getOrThrow() + + val notaryParty = alice.netMapCache.getNotary(notaryName)!! + val notaryNodeKeyPair = databaseTransaction(masterNode.database) { masterNode.services.notaryIdentityKey } + val aliceKey = databaseTransaction(alice.database) { alice.services.legalIdentityKey } + + val inputState = issueState(alice, notaryParty, notaryNodeKeyPair) + + val firstSpendTx = TransactionType.General.Builder(notaryParty).withItems(inputState).run { + signWith(aliceKey) + toSignedTransaction(false) + } + + val firstSpend = alice.services.startFlow(NotaryFlow.Client(firstSpendTx)) + firstSpend.resultFuture.getOrThrow() + + val secondSpendTx = TransactionType.General.Builder(notaryParty).withItems(inputState).run { + val dummyState = DummyContract.SingleOwnerState(0, alice.info.legalIdentity.owningKey) + addOutputState(dummyState) + signWith(aliceKey) + toSignedTransaction(false) + } + val secondSpend = alice.services.startFlow(NotaryFlow.Client(secondSpendTx)) + + val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() } + val error = ex.error as NotaryError.Conflict + assertEquals(error.txId, secondSpendTx.id) + } + + private fun issueState(node: AbstractNode, notary: Party, notaryKey: KeyPair): StateAndRef<*> { + return databaseTransaction(node.database) { + val tx = DummyContract.generateInitial(Random().nextInt(), notary, node.info.legalIdentity.ref(0)) + tx.signWith(node.services.legalIdentityKey) + tx.signWith(notaryKey) + val stx = tx.toSignedTransaction() + node.services.recordTransactions(listOf(stx)) + StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0)) + } + } + + private fun startBFTNotaryCluster(notaryName: String, + clusterSize: Int, + serviceType: ServiceType): List { + val quorum = (2 * clusterSize + 1) / 3 + ServiceIdentityGenerator.generateToDisk( + (0 until clusterSize).map { tempFolder.root.toPath() / "$notaryName-$it" }, + serviceType.id, + notaryName, + quorum) + + val serviceInfo = ServiceInfo(serviceType, notaryName) + val masterNode = startNode( + "$notaryName-0", + advertisedServices = setOf(serviceInfo), + configOverrides = mapOf("notaryNodeId" to 0) + ).getOrThrow() + + val remainingNodes = (1 until clusterSize).map { + startNode( + "$notaryName-$it", + advertisedServices = setOf(serviceInfo), + configOverrides = mapOf("notaryNodeId" to it) + ).getOrThrow() + } + + return remainingNodes + masterNode + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index dbcf24d300..7b09aa960a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -25,6 +25,7 @@ import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction import net.corda.flows.* import net.corda.node.services.api.* +import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.events.NodeSchedulerService @@ -440,6 +441,12 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, SimpleNotaryService.type -> SimpleNotaryService(services, timestampChecker, uniquenessProvider) ValidatingNotaryService.type -> ValidatingNotaryService(services, timestampChecker, uniquenessProvider) RaftValidatingNotaryService.type -> RaftValidatingNotaryService(services, timestampChecker, uniquenessProvider as RaftUniquenessProvider) + BFTNonValidatingNotaryService.type -> with(configuration as FullNodeConfiguration) { + val nodeId = notaryNodeId ?: throw IllegalArgumentException("notaryNodeId value must be specified in the configuration") + val client = BFTSMaRt.Client(nodeId) + tokenizableServices.add(client) + BFTNonValidatingNotaryService(services, timestampChecker, nodeId, database, client) + } else -> { throw IllegalArgumentException("Notary type ${type.id} is not handled by makeNotaryService.") } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 2b740f3dd7..2cb8452750 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -25,11 +25,7 @@ import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.RaftUniquenessProvider import net.corda.node.services.transactions.RaftValidatingNotaryService -import net.corda.node.services.transactions.BFTSmartUniquenessProvider -import net.corda.node.services.transactions.BFTValidatingNotaryService import net.corda.node.utilities.AffinityExecutor -import net.corda.node.utilities.databaseTransaction -import org.jetbrains.exposed.sql.Database import java.io.RandomAccessFile import java.lang.management.ManagementFactory import java.nio.channels.FileLock @@ -164,9 +160,6 @@ class Node(override val configuration: FullNodeConfiguration, RaftValidatingNotaryService.type -> with(configuration) { RaftUniquenessProvider(baseDirectory, notaryNodeAddress!!, notaryClusterAddresses, database, configuration) } - BFTValidatingNotaryService.type -> with(configuration) { - BFTSmartUniquenessProvider(notaryNodeAddress!!, notaryClusterAddresses, database) - } else -> PersistentUniquenessProvider() } } diff --git a/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt b/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt index 9ad88565d5..84972ba5a2 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt @@ -47,11 +47,12 @@ object ConfigHelper { } } -@Suppress("UNCHECKED_CAST") +@Suppress("UNCHECKED_CAST", "PLATFORM_CLASS_MAPPED_TO_KOTLIN") operator fun Config.getValue(receiver: Any, metadata: KProperty<*>): T { return when (metadata.returnType.javaType) { String::class.java -> getString(metadata.name) as T Int::class.java -> getInt(metadata.name) as T + Integer::class.java -> getInt(metadata.name) as T Long::class.java -> getLong(metadata.name) as T Double::class.java -> getDouble(metadata.name) as T Boolean::class.java -> getBoolean(metadata.name) as T diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 44ae971d6a..405ee32429 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -71,6 +71,7 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config val messagingServerAddress: HostAndPort? by config.getOrElse { null } val extraAdvertisedServiceIds: List = config.getListOrElse("extraAdvertisedServiceIds") { emptyList() } val useTestClock: Boolean by config.getOrElse { false } + val notaryNodeId: Int? by config.getOrElse { null } val notaryNodeAddress: HostAndPort? by config.getOrElse { null } val notaryClusterAddresses: List = config .getListOrElse("notaryClusterAddresses") { emptyList() } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt new file mode 100644 index 0000000000..128370e7b3 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt @@ -0,0 +1,93 @@ +package net.corda.node.services.transactions + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.crypto.DigitalSignature +import net.corda.core.crypto.Party +import net.corda.core.flows.FlowLogic +import net.corda.core.node.services.TimestampChecker +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.transactions.FilteredTransaction +import net.corda.core.utilities.debug +import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.unwrap +import net.corda.flows.NotaryException +import net.corda.node.services.api.ServiceHubInternal +import org.jetbrains.exposed.sql.Database +import kotlin.concurrent.thread + +/** + * A non-validating notary service operated by a group of parties that don't necessarily trust each other. + * + * A transaction is notarised when the consensus is reached by the cluster on its uniqueness, and timestamp validity. + */ +class BFTNonValidatingNotaryService(services: ServiceHubInternal, + timestampChecker: TimestampChecker, + serverId: Int, + db: Database, + val client: BFTSMaRt.Client) : NotaryService(services) { + init { + thread(name = "BFTSmartServer-$serverId", isDaemon = true) { + Server(serverId, db, "bft_smart_notary_committed_states", services, timestampChecker) + } + } + + companion object { + val type = SimpleNotaryService.type.getSubType("bft") + private val log = loggerFor() + } + + override fun createFlow(otherParty: Party) = ServiceFlow(otherParty, client) + + class ServiceFlow(val otherSide: Party, val client: BFTSMaRt.Client) : FlowLogic() { + @Suspendable + override fun call(): Void? { + val stx = receive(otherSide).unwrap { it } + val signatures = commit(stx) + send(otherSide, signatures) + return null + } + + private fun commit(stx: FilteredTransaction): List { + val response = client.commitTransaction(stx, otherSide) + when (response) { + is BFTSMaRt.ClusterResponse.Error -> throw NotaryException(response.error) + is BFTSMaRt.ClusterResponse.Signatures -> { + log.debug("All input states of transaction ${stx.rootHash} have been committed") + return response.txSignatures + } + } + } + } + + class Server(id: Int, + db: Database, + tableName: String, + services: ServiceHubInternal, + timestampChecker: TimestampChecker) : BFTSMaRt.Server(id, db, tableName, services, timestampChecker) { + + override fun executeCommand(command: ByteArray): ByteArray { + val request = command.deserialize() + val ftx = request.tx as FilteredTransaction + val response = verifyAndCommitTx(ftx, request.callerIdentity) + return response.serialize().bytes + } + + fun verifyAndCommitTx(ftx: FilteredTransaction, callerIdentity: Party): BFTSMaRt.ReplicaResponse { + return try { + val id = ftx.rootHash + val inputs = ftx.filteredLeaves.inputs + + validateTimestamp(ftx.filteredLeaves.timestamp) + commitInputStates(inputs, id, callerIdentity) + + log.debug { "Inputs committed successfully, signing $id" } + val sig = sign(id.bytes) + BFTSMaRt.ReplicaResponse.Signature(sig) + } catch (e: NotaryException) { + log.debug { "Error processing transaction: ${e.error}" } + BFTSMaRt.ReplicaResponse.Error(e.error) + } + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt new file mode 100644 index 0000000000..7581506578 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt @@ -0,0 +1,224 @@ +package net.corda.node.services.transactions + +import bftsmart.tom.MessageContext +import bftsmart.tom.ServiceProxy +import bftsmart.tom.ServiceReplica +import bftsmart.tom.core.messages.TOMMessage +import bftsmart.tom.server.defaultservices.DefaultRecoverable +import bftsmart.tom.server.defaultservices.DefaultReplier +import bftsmart.tom.util.Extractor +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.Timestamp +import net.corda.core.crypto.* +import net.corda.core.node.services.TimestampChecker +import net.corda.core.node.services.UniquenessProvider +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.transactions.FilteredTransaction +import net.corda.core.transactions.SignedTransaction +import net.corda.core.utilities.debug +import net.corda.core.utilities.loggerFor +import net.corda.flows.NotaryError +import net.corda.flows.NotaryException +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.transactions.BFTSMaRt.Client +import net.corda.node.services.transactions.BFTSMaRt.Server +import net.corda.node.utilities.JDBCHashMap +import net.corda.node.utilities.databaseTransaction +import org.jetbrains.exposed.sql.Database +import java.util.* + +/** + * Implements a replicated transaction commit log based on the [BFT-SMaRt](https://github.com/bft-smart/library) + * consensus algorithm. Every replica in the cluster is running a [Server] maintaining the state, and a[Client] is used + * to to relay state modification requests to all [Server]s. + */ +// TODO: Write bft-smart host config file based on Corda node configuration. +// TODO: Define and document the configuration of the bft-smart cluster. +// TODO: Potentially update the bft-smart API for our use case or rebuild client and server from lower level building +// blocks bft-smart provides. +// TODO: Support cluster membership changes. This requires reading about reconfiguration of bft-smart clusters and +// perhaps a design doc. In general, it seems possible to use the state machine to reconfigure the cluster (reaching +// consensus about membership changes). Nodes that join the cluster for the first time or re-join can go through +// a "recovering" state and request missing data from their peers. +object BFTSMaRt { + /** Sent from [Client] to [Server]. */ + @CordaSerializable + data class CommitRequest(val tx: Any, val callerIdentity: Party) + + /** Sent from [Server] to [Client]. */ + @CordaSerializable + sealed class ReplicaResponse { + class Error(val error: NotaryError) : ReplicaResponse() + class Signature(val txSignature: DigitalSignature) : ReplicaResponse() + } + + /** An aggregate response from all replica ([Server]) replies sent from [Client] back to the calling application. */ + @CordaSerializable + sealed class ClusterResponse { + class Error(val error: NotaryError) : ClusterResponse() + class Signatures(val txSignatures: List) : ClusterResponse() + } + + class Client(val id: Int) : SingletonSerializeAsToken() { + companion object { + private val log = loggerFor() + } + + /** A proxy for communicating with the BFT cluster */ + private val proxy: ServiceProxy by lazy { buildProxy() } + + /** + * Sends a transaction commit request to the BFT cluster. The [proxy] will deliver the request to every + * replica, and block until a sufficient number of replies are received. + */ + fun commitTransaction(transaction: Any, otherSide: Party): ClusterResponse { + require(transaction is FilteredTransaction || transaction is SignedTransaction) { "Unsupported transaction type: ${transaction.javaClass.name}" } + val request = CommitRequest(transaction, otherSide) + val responseBytes = proxy.invokeOrdered(request.serialize().bytes) + val response = responseBytes.deserialize() + return response + } + + private fun buildProxy(): ServiceProxy { + val comparator = buildResponseComparator() + val extractor = buildExtractor() + return ServiceProxy(id, "bft-smart-config", comparator, extractor) + } + + /** A comparator to check if replies from two replicas are the same. */ + private fun buildResponseComparator(): Comparator { + return Comparator { o1, o2 -> + val reply1 = o1.deserialize() + val reply2 = o2.deserialize() + if (reply1 is ReplicaResponse.Error && reply2 is ReplicaResponse.Error) { + // TODO: for now we treat all errors as equal, compare by error type as well + 0 + } else if (reply1 is ReplicaResponse.Signature && reply2 is ReplicaResponse.Signature) 0 else -1 + } + } + + /** An extractor to build the final response message for the client application from all received replica replies. */ + private fun buildExtractor(): Extractor { + return Extractor { replies, sameContent, lastReceived -> + val responses = replies.mapNotNull { it?.content?.deserialize() } + val accepted = responses.filterIsInstance() + val rejected = responses.filterIsInstance() + + log.debug { "BFT Client $id: number of replicas accepted the commit: ${accepted.size}, rejected: ${rejected.size}" } + + // TODO: only return an aggregate if the majority of signatures are replies + // TODO: return an error reported by the majority and not just the first one + val aggregateResponse = if (accepted.isNotEmpty()) { + log.debug { "Cluster response - signatures: ${accepted.map { it.txSignature }}" } + ClusterResponse.Signatures(accepted.map { it.txSignature }) + } else { + log.debug { "Cluster response - error: ${rejected.first().error}" } + ClusterResponse.Error(rejected.first().error) + } + + val messageContent = aggregateResponse.serialize().bytes + // TODO: is it safe use the last message for sender/session/sequence info + val reply = replies[lastReceived] + TOMMessage(reply.sender, reply.session, reply.sequence, messageContent, reply.viewID) + } + } + } + + /** + * Maintains the commit log and executes commit commands received from the [Client]. + * + * The validation logic can be specified by implementing the [executeCommand] method. + */ + @Suppress("LeakingThis") + abstract class Server(val id: Int, + val db: Database, + tableName: String, + val services: ServiceHubInternal, + val timestampChecker: TimestampChecker) : DefaultRecoverable() { + companion object { + private val log = loggerFor() + } + + // TODO: Use Requery with proper DB schema instead of JDBCHashMap. + // Must be initialised before ServiceReplica is started + val commitLog = databaseTransaction(db) { JDBCHashMap(tableName) } + + init { + // TODO: Looks like this statement is blocking. Investigate the bft-smart node startup. + ServiceReplica(id, "bft-smart-config", this, this, null, DefaultReplier()) + } + + override fun appExecuteUnordered(command: ByteArray, msgCtx: MessageContext): ByteArray? { + throw NotImplementedError("No unordered operations supported") + } + + override fun appExecuteBatch(command: Array, mcs: Array): Array { + val replies = command.zip(mcs) { c, m -> + executeCommand(c) + } + return replies.toTypedArray() + } + + /** + * Implement logic to execute the command and commit the transaction to the log. + * Helper methods are provided for transaction processing: [commitInputStates], [validateTimestamp], and [sign]. + */ + abstract fun executeCommand(command: ByteArray): ByteArray? + + protected fun commitInputStates(states: List, txId: SecureHash, callerIdentity: Party) { + log.debug { "Attempting to commit inputs for transaction: $txId" } + val conflicts = mutableMapOf() + databaseTransaction(db) { + states.forEach { state -> + commitLog[state]?.let { conflicts[state] = it } + } + if (conflicts.isEmpty()) { + log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" } + states.forEachIndexed { i, stateRef -> + val txInfo = UniquenessProvider.ConsumingTx(txId, i, callerIdentity) + commitLog.put(stateRef, txInfo) + } + } else { + log.debug { "Conflict detected – the following inputs have already been committed: ${conflicts.keys.joinToString()}" } + val conflict = UniquenessProvider.Conflict(conflicts) + val conflictData = conflict.serialize() + val signedConflict = SignedData(conflictData, sign(conflictData.bytes)) + throw NotaryException(NotaryError.Conflict(txId, signedConflict)) + } + } + } + + protected fun validateTimestamp(t: Timestamp?) { + if (t != null && !timestampChecker.isValid(t)) + throw NotaryException(NotaryError.TimestampInvalid()) + } + + protected fun sign(bytes: ByteArray): DigitalSignature.WithKey { + val mySigningKey = databaseTransaction(db) { services.notaryIdentityKey } + return mySigningKey.signWithECDSA(bytes) + } + + // TODO: + // - Test snapshot functionality with different bft-smart cluster configurations. + // - Add streaming to support large data sets. + override fun getSnapshot(): ByteArray { + // LinkedHashMap for deterministic serialisation + val m = LinkedHashMap() + databaseTransaction(db) { + commitLog.forEach { m[it.key] = it.value } + } + return m.serialize().bytes + } + + override fun installSnapshot(bytes: ByteArray) { + val m = bytes.deserialize>() + databaseTransaction(db) { + commitLog.clear() + commitLog.putAll(m) + } + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSmartUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSmartUniquenessProvider.kt deleted file mode 100644 index 0468c90436..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSmartUniquenessProvider.kt +++ /dev/null @@ -1,77 +0,0 @@ -package net.corda.node.services.transactions - -import com.google.common.net.HostAndPort -import net.corda.core.contracts.StateRef -import net.corda.core.crypto.Party -import net.corda.core.crypto.SecureHash -import net.corda.core.node.services.UniquenessException -import net.corda.core.node.services.UniquenessProvider -import net.corda.core.utilities.loggerFor -import org.jetbrains.exposed.sql.Database -import kotlin.concurrent.thread - -/** - * A [UniquenessProvider] based on the [bft-smart library](https://github.com/bft-smart/library). - * - * Experimental, not ready for production yet. - * - * A [BFTSmartUniquenessProvider] starts a [BFTSmartServer] that joins the notary cluster and stores committed input - * states and a [BFTSmartClient] to commit states to the notary cluster. - * - * @param clusterAddresses the addresses of all BFTSmartUniquenessProviders of the notary cluster - * @param myAddress the address of this uniqueness provider, must be listed in clusterAddresses - */ -class BFTSmartUniquenessProvider(val myAddress: HostAndPort, val clusterAddresses: List, val db: Database) : UniquenessProvider { - // TODO: Write bft-smart host config file based on Corda node configuration. - // TODO: Define and document the configuration of the bft-smart cluster. - - // TODO: Potentially update the bft-smart API for our use case or rebuild client and server from lower level building - // blocks bft-smart provides. - - // TODO: Support cluster membership changes. This requires reading about reconfiguration of bft-smart clusters and - // perhaps a design doc. In general, it seems possible to use the state machine to reconfigure the cluster (reaching - // consensus about membership changes). Nodes that join the cluster for the first time or re-join can go through - // a "recovering" state and request missing data from their peers. - - init { - require(myAddress in clusterAddresses) { - "expected myAddress '$myAddress' to be listed in clusterAddresses '$clusterAddresses'" - } - startServerThread() - } - - companion object { - private val log = loggerFor() - } - - private val bftClient = BFTSmartClient(clientId()) - - /** Throws UniquenessException if conflict is detected */ - override fun commit(states: List, txId: SecureHash, callerIdentity: Party) { - val entries = states.mapIndexed { i, stateRef -> - stateRef to UniquenessProvider.ConsumingTx(txId, i, callerIdentity) - }.toMap() - val conflicts = bftClient.put(entries) - if (conflicts.isNotEmpty()) { - throw UniquenessException(UniquenessProvider.Conflict(conflicts)) - } - log.debug("All input states of transaction $txId have been committed") - } - - private fun serverId(): Int { - return clusterAddresses.indexOf(myAddress) - } - - private fun clientId(): Int { - // 10k IDs are reserved for servers. - require(clusterAddresses.size <= 10000) - return 10000 + serverId() - } - - private fun startServerThread() { - val id = serverId() - thread(name="BFTSmartServer-$id", isDaemon=true) { - BFTSmartServer(id, db, "bft_smart_notary_committed_states") - } - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTValidatingNotaryService.kt deleted file mode 100644 index 6cc764d45b..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTValidatingNotaryService.kt +++ /dev/null @@ -1,26 +0,0 @@ -package net.corda.node.services.transactions - -import net.corda.core.crypto.Party -import net.corda.core.node.services.TimestampChecker -import net.corda.flows.ValidatingNotaryFlow -import net.corda.node.services.api.ServiceHubInternal - -/** - * A validating notary service operated by a group of parties that don't necessarily trust each other. - * - * To validate a transaction, this service collects proofs that the transaction has been validated and committed by a - * specified number of notary nodes. - * - * Based on the [bft-smart library](https://github.com/bft-smart/library). - */ -class BFTValidatingNotaryService(services: ServiceHubInternal, - val timestampChecker: TimestampChecker, - val uniquenessProvider: BFTSmartUniquenessProvider) : NotaryService(services) { - companion object { - val type = ValidatingNotaryService.type.getSubType("bft") - } - - override fun createFlow(otherParty: Party): ValidatingNotaryFlow { - return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider) - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableBFTMap.kt b/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableBFTMap.kt deleted file mode 100644 index 2ad2235979..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableBFTMap.kt +++ /dev/null @@ -1,120 +0,0 @@ -package net.corda.node.services.transactions - -import bftsmart.tom.MessageContext -import bftsmart.tom.ServiceProxy -import bftsmart.tom.ServiceReplica -import bftsmart.tom.server.defaultservices.DefaultRecoverable -import bftsmart.tom.server.defaultservices.DefaultReplier -import net.corda.core.serialization.CordaSerializable -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize -import net.corda.node.utilities.JDBCHashMap -import net.corda.node.utilities.databaseTransaction -import org.jetbrains.exposed.sql.Database -import java.util.* - -@CordaSerializable -enum class RequestType { - Get, - Put -} - -/** Sent from [BFTSmartClient] to [BFTSmartServer] */ -@CordaSerializable -data class Request(val type: RequestType, val data: Any) - -class BFTSmartClient(id: Int) { - - val clientProxy = ServiceProxy(id, "bft-smart-config") - - /** - * Returns conflicts as a map of conflicting keys and their stored values or an empty map if all entries are - * committed without conflicts. - */ - fun put(entries: Map): Map { - val request = Request(RequestType.Put, entries) - val responseBytes = clientProxy.invokeOrdered(request.serialize().bytes) - return responseBytes.deserialize>() - } - - /** Returns the value associated with the key or null if no value is stored under the key. */ - fun get(key: K): V? { - val request = Request(RequestType.Get, key) - val responseBytes = clientProxy.invokeUnordered(request.serialize().bytes) ?: return null - return responseBytes.deserialize() - } -} - -class BFTSmartServer(val id: Int, val db: Database, tableName: String) : DefaultRecoverable() { - // TODO: Exception handling when processing client input. - - // TODO: Use Requery with proper DB schema instead of JDBCHashMap. - val table = databaseTransaction(db) { JDBCHashMap(tableName) } - - // TODO: Looks like this statement is blocking. Investigate the bft-smart node startup. - val replica = ServiceReplica(id, "bft-smart-config", this, this, null, DefaultReplier()) - - @Suppress("UNUSED_PARAMETER") - override fun appExecuteUnordered(command: ByteArray, msgCtx: MessageContext): ByteArray? { - // TODO: collect signatures from msgCtx - val request = command.deserialize() - when (request.type) { - RequestType.Get -> { - val v = databaseTransaction(db) { table[request.data] } ?: return null - return v.serialize().bytes - } - else -> { - throw Exception("Unhandled request type: ${request.type}") - } - } - } - - override fun appExecuteBatch(command: Array, mcs: Array): Array { - val replies = command.zip(mcs) { c, m -> - executeSingle(c, m) - } - return replies.toTypedArray() - } - - @Suppress("UNUSED_PARAMETER") - private fun executeSingle(command: ByteArray, msgCtx: MessageContext): ByteArray? { - // TODO: collect signatures from msgCtx - val request = command.deserialize() - val conflicts = mutableMapOf() - when (request.type) { - RequestType.Put -> { - @Suppress("UNCHECKED_CAST") - val m = request.data as Map - databaseTransaction(db) { - for (k in m.keys) table[k]?.let { conflicts[k] = it } - if (conflicts.isEmpty()) table.putAll(m) - } - } - else -> { - throw Exception("Unhandled request type: ${request.type}") - } - } - return conflicts.serialize().bytes - } - - // TODO: - // - Test snapshot functionality with different bft-smart cluster configurations. - // - Add streaming to support large data sets. - override fun getSnapshot(): ByteArray { - // LinkedHashMap for deterministic serialisation - // TODO: Simply use an array of pairs. - val m = LinkedHashMap() - databaseTransaction(db) { - table.forEach { m[it.key] = it.value } - } - return m.serialize().bytes - } - - override fun installSnapshot(bytes: ByteArray) { - val m = bytes.deserialize>() - databaseTransaction(db) { - table.clear() - table.putAll(m) - } - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt index 66410033cb..9b8233a8fd 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt @@ -1,6 +1,7 @@ package net.corda.node.services.transactions import net.corda.core.crypto.Party +import net.corda.core.flows.FlowLogic import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.flows.NotaryFlow import net.corda.node.services.api.ServiceHubInternal @@ -21,6 +22,6 @@ abstract class NotaryService(services: ServiceHubInternal) : SingletonSerializeA } /** Implement a factory that specifies the transaction commit flow for the notary service to use */ - abstract fun createFlow(otherParty: Party): NotaryFlow.Service + abstract fun createFlow(otherParty: Party): FlowLogic } diff --git a/node/src/main/kotlin/net/corda/node/utilities/ServiceIdentityGenerator.kt b/node/src/main/kotlin/net/corda/node/utilities/ServiceIdentityGenerator.kt index c4a1981952..9b6b311425 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/ServiceIdentityGenerator.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/ServiceIdentityGenerator.kt @@ -45,7 +45,8 @@ fun main(args: Array) { val dirs = args[0].split(",").map { Paths.get(it) } val serviceId = args[1] val serviceName = args[2] + val quorumSize = args.getOrNull(3)?.toInt() ?: 1 println("Generating service identity for \"$serviceName\"") - ServiceIdentityGenerator.generateToDisk(dirs, serviceId, serviceName) + ServiceIdentityGenerator.generateToDisk(dirs, serviceId, serviceName, quorumSize) } \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/DistributedImmutableBFTMapTests.kt b/node/src/test/kotlin/net/corda/node/services/DistributedImmutableBFTMapTests.kt deleted file mode 100644 index 7a4362a5e2..0000000000 --- a/node/src/test/kotlin/net/corda/node/services/DistributedImmutableBFTMapTests.kt +++ /dev/null @@ -1,68 +0,0 @@ -package net.corda.node.services - -import net.corda.node.services.transactions.BFTSmartClient -import net.corda.node.services.transactions.BFTSmartServer -import net.corda.node.utilities.configureDatabase -import net.corda.testing.node.makeTestDataSourceProperties -import org.jetbrains.exposed.sql.Database -import org.junit.After -import org.junit.Before -import org.junit.Ignore -import org.junit.Test -import java.io.Closeable -import kotlin.concurrent.thread -import kotlin.test.assertEquals - -class DistributedImmutableBFTMapTests { - - // TODO: Setup Corda cluster instead of starting server threads (see DistributedImmutableMapTests). - - lateinit var dataSource: Closeable - lateinit var database: Database - - @Before - fun setup() { - val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties()) - dataSource = dataSourceAndDatabase.first - database = dataSourceAndDatabase.second - } - - @After - fun tearDown() { - dataSource.close() - } - - @Test @Ignore - fun `stores entries correctly and detects conflicts`() { - (0..3).forEach { i -> - thread { BFTSmartServer(i, database, "bft_notary_committed_states_$i") }.apply { Thread.sleep(500) } - } - - Thread.sleep(3000) // TODO: Get notified when the servers are ready. - - val client = BFTSmartClient(1001) - - val m = mapOf("a" to "b", "c" to "d", "e" to "f") - - val conflicts = client.put(m) - assertEquals(mapOf(), conflicts) - - for ((k, v) in m) { - val r = client.get(k) - assertEquals(v, r) - } - - val conflicts2 = client.put(mapOf("a" to "b2")) - assertEquals(mapOf("a" to "b"), conflicts2) - - // Values are not mutated. - for ((k, v) in m) { - val r = client.get(k) - assertEquals(v, r) - } - - // Null response encodes 'not found'. - val r = client.get("x") - assertEquals(null, r) - } -}