mirror of
https://github.com/corda/corda.git
synced 2025-06-18 15:18:16 +00:00
Implement bft-smart notary prototype
This commit is contained in:
committed by
Andrius Dagys
parent
0c58a506a9
commit
99721bf8f1
4
.gitignore
vendored
4
.gitignore
vendored
@ -77,3 +77,7 @@ crashlytics-build.properties
|
|||||||
|
|
||||||
# docs related
|
# docs related
|
||||||
docs/virtualenv/
|
docs/virtualenv/
|
||||||
|
|
||||||
|
# bft-smart
|
||||||
|
node/bft-smart-config/currentView
|
||||||
|
node/config/currentView
|
||||||
|
36
node/bft-smart-config/hosts.config
Normal file
36
node/bft-smart-config/hosts.config
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
# Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# This file defines the replicas ids, IPs and ports.
|
||||||
|
# It is used by the replicas and clients to find connection info
|
||||||
|
# to the initial replicas.
|
||||||
|
# The ports defined here are the ports used by clients to communicate
|
||||||
|
# with the replicas. Additional connections are opened by replicas to
|
||||||
|
# communicate with each other. This additional connection is opened in the
|
||||||
|
# next port defined here. For an example, consider the line "0 127.0.0.1 11000".
|
||||||
|
# That means that clients will open a communication channel to replica 0 in
|
||||||
|
# IP 127.0.0.1 and port 11000. On startup, replicas with id different than 0
|
||||||
|
# will open a communication channel to replica 0 in port 11001.
|
||||||
|
# The same holds for replicas 1, 2, 3 ... N.
|
||||||
|
|
||||||
|
#server id, address and port (the ids from 0 to n-1 are the service replicas)
|
||||||
|
0 127.0.0.1 11000
|
||||||
|
1 127.0.0.1 11010
|
||||||
|
2 127.0.0.1 11020
|
||||||
|
3 127.0.0.1 11030
|
||||||
|
4 127.0.0.1 11040
|
||||||
|
5 127.0.0.1 11050
|
||||||
|
6 127.0.0.1 11060
|
||||||
|
7 127.0.0.1 11070
|
||||||
|
7001 127.0.0.1 11100
|
118
node/bft-smart-config/system.config
Normal file
118
node/bft-smart-config/system.config
Normal file
@ -0,0 +1,118 @@
|
|||||||
|
# Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
############################################
|
||||||
|
####### Communication Configurations #######
|
||||||
|
############################################
|
||||||
|
|
||||||
|
#HMAC algorithm used to authenticate messages between processes (HmacMD5 is the default value)
|
||||||
|
#This parameter is not currently being used being used
|
||||||
|
#system.authentication.hmacAlgorithm = HmacSHA1
|
||||||
|
|
||||||
|
#Specify if the communication system should use a thread to send data (true or false)
|
||||||
|
system.communication.useSenderThread = true
|
||||||
|
|
||||||
|
#Force all processes to use the same public/private keys pair and secret key. This is useful when deploying experiments
|
||||||
|
#and benchmarks, but must not be used in production systems.
|
||||||
|
system.communication.defaultkeys = true
|
||||||
|
|
||||||
|
############################################
|
||||||
|
### Replication Algorithm Configurations ###
|
||||||
|
############################################
|
||||||
|
|
||||||
|
#Number of servers in the group
|
||||||
|
system.servers.num = 4
|
||||||
|
|
||||||
|
#Maximum number of faulty replicas
|
||||||
|
system.servers.f = 1
|
||||||
|
|
||||||
|
#Timeout to asking for a client request
|
||||||
|
system.totalordermulticast.timeout = 2000
|
||||||
|
|
||||||
|
|
||||||
|
#Maximum batch size (in number of messages)
|
||||||
|
system.totalordermulticast.maxbatchsize = 400
|
||||||
|
|
||||||
|
#Number of nonces (for non-determinism actions) generated
|
||||||
|
system.totalordermulticast.nonces = 10
|
||||||
|
|
||||||
|
#if verification of leader-generated timestamps are increasing
|
||||||
|
#it can only be used on systems in which the network clocks
|
||||||
|
#are synchronized
|
||||||
|
system.totalordermulticast.verifyTimestamps = false
|
||||||
|
|
||||||
|
#Quantity of messages that can be stored in the receive queue of the communication system
|
||||||
|
system.communication.inQueueSize = 500000
|
||||||
|
|
||||||
|
# Quantity of messages that can be stored in the send queue of each replica
|
||||||
|
system.communication.outQueueSize = 500000
|
||||||
|
|
||||||
|
#Set to 1 if SMaRt should use signatures, set to 0 if otherwise
|
||||||
|
system.communication.useSignatures = 0
|
||||||
|
|
||||||
|
#Set to 1 if SMaRt should use MAC's, set to 0 if otherwise
|
||||||
|
system.communication.useMACs = 1
|
||||||
|
|
||||||
|
#Set to 1 if SMaRt should use the standard output to display debug messages, set to 0 if otherwise
|
||||||
|
system.debug = 0
|
||||||
|
|
||||||
|
#Print information about the replica when it is shutdown
|
||||||
|
system.shutdownhook = true
|
||||||
|
|
||||||
|
############################################
|
||||||
|
###### State Transfer Configurations #######
|
||||||
|
############################################
|
||||||
|
|
||||||
|
#Activate the state transfer protocol ('true' to activate, 'false' to de-activate)
|
||||||
|
system.totalordermulticast.state_transfer = false
|
||||||
|
|
||||||
|
#Maximum ahead-of-time message not discarded
|
||||||
|
system.totalordermulticast.highMark = 10000
|
||||||
|
|
||||||
|
#Maximum ahead-of-time message not discarded when the replica is still on EID 0 (after which the state transfer is triggered)
|
||||||
|
system.totalordermulticast.revival_highMark = 10
|
||||||
|
|
||||||
|
#Number of ahead-of-time messages necessary to trigger the state transfer after a request timeout occurs
|
||||||
|
system.totalordermulticast.timeout_highMark = 200
|
||||||
|
|
||||||
|
############################################
|
||||||
|
###### Log and Checkpoint Configurations ###
|
||||||
|
############################################
|
||||||
|
|
||||||
|
system.totalordermulticast.log = false
|
||||||
|
system.totalordermulticast.log_parallel = false
|
||||||
|
system.totalordermulticast.log_to_disk = false
|
||||||
|
system.totalordermulticast.sync_log = false
|
||||||
|
|
||||||
|
#Period at which BFT-SMaRt requests the state to the application (for the state transfer state protocol)
|
||||||
|
system.totalordermulticast.checkpoint_period = 1
|
||||||
|
system.totalordermulticast.global_checkpoint_period = 1
|
||||||
|
|
||||||
|
system.totalordermulticast.checkpoint_to_disk = false
|
||||||
|
system.totalordermulticast.sync_ckp = false
|
||||||
|
|
||||||
|
|
||||||
|
############################################
|
||||||
|
###### Reconfiguration Configurations ######
|
||||||
|
############################################
|
||||||
|
|
||||||
|
#Replicas ID for the initial view, separated by a comma.
|
||||||
|
# The number of replicas in this parameter should be equal to that specified in 'system.servers.num'
|
||||||
|
system.initial.view = 0,1,2,3
|
||||||
|
|
||||||
|
#The ID of the trust third party (TTP)
|
||||||
|
system.ttp.id = 7002
|
||||||
|
|
||||||
|
#This sets if the system will function in Byzantine or crash-only mode. Set to "true" to support Byzantine faults
|
||||||
|
system.bft = true
|
@ -157,6 +157,10 @@ dependencies {
|
|||||||
// OkHTTP: Simple HTTP library.
|
// OkHTTP: Simple HTTP library.
|
||||||
compile "com.squareup.okhttp3:okhttp:$okhttp_version"
|
compile "com.squareup.okhttp3:okhttp:$okhttp_version"
|
||||||
|
|
||||||
|
// BFT-SMaRt
|
||||||
|
compile 'commons-codec:commons-codec:1.10'
|
||||||
|
compile 'com.github.bft-smart:library:master-v1.1-beta-g6215ec8-87'
|
||||||
|
|
||||||
// Integration test helpers
|
// Integration test helpers
|
||||||
integrationTestCompile "junit:junit:$junit_version"
|
integrationTestCompile "junit:junit:$junit_version"
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,8 @@ import net.corda.node.services.messaging.NodeMessagingClient
|
|||||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||||
import net.corda.node.services.transactions.RaftUniquenessProvider
|
import net.corda.node.services.transactions.RaftUniquenessProvider
|
||||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||||
|
import net.corda.node.services.transactions.BFTSmartUniquenessProvider
|
||||||
|
import net.corda.node.services.transactions.BFTValidatingNotaryService
|
||||||
import net.corda.node.utilities.AffinityExecutor
|
import net.corda.node.utilities.AffinityExecutor
|
||||||
import net.corda.node.utilities.databaseTransaction
|
import net.corda.node.utilities.databaseTransaction
|
||||||
import org.jetbrains.exposed.sql.Database
|
import org.jetbrains.exposed.sql.Database
|
||||||
@ -163,6 +165,9 @@ class Node(override val configuration: FullNodeConfiguration,
|
|||||||
RaftValidatingNotaryService.type -> with(configuration) {
|
RaftValidatingNotaryService.type -> with(configuration) {
|
||||||
RaftUniquenessProvider(baseDirectory, notaryNodeAddress!!, notaryClusterAddresses, database, configuration)
|
RaftUniquenessProvider(baseDirectory, notaryNodeAddress!!, notaryClusterAddresses, database, configuration)
|
||||||
}
|
}
|
||||||
|
BFTValidatingNotaryService.type -> with(configuration) {
|
||||||
|
BFTSmartUniquenessProvider(notaryNodeAddress!!, notaryClusterAddresses, database)
|
||||||
|
}
|
||||||
else -> PersistentUniquenessProvider()
|
else -> PersistentUniquenessProvider()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,77 @@
|
|||||||
|
package net.corda.node.services.transactions
|
||||||
|
|
||||||
|
import com.google.common.net.HostAndPort
|
||||||
|
import net.corda.core.contracts.StateRef
|
||||||
|
import net.corda.core.crypto.Party
|
||||||
|
import net.corda.core.crypto.SecureHash
|
||||||
|
import net.corda.core.node.services.UniquenessException
|
||||||
|
import net.corda.core.node.services.UniquenessProvider
|
||||||
|
import net.corda.core.utilities.loggerFor
|
||||||
|
import org.jetbrains.exposed.sql.Database
|
||||||
|
import kotlin.concurrent.thread
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A [UniquenessProvider] based on the [bft-smart library](https://github.com/bft-smart/library).
|
||||||
|
*
|
||||||
|
* Experimental, not ready for production yet.
|
||||||
|
*
|
||||||
|
* A [BFTSmartUniquenessProvider] starts a [BFTSmartServer] that joins the notary cluster and stores committed input
|
||||||
|
* states and a [BFTSmartClient] to commit states to the notary cluster.
|
||||||
|
*
|
||||||
|
* @param clusterAddresses the addresses of all BFTSmartUniquenessProviders of the notary cluster
|
||||||
|
* @param myAddress the address of this uniqueness provider, must be listed in clusterAddresses
|
||||||
|
*/
|
||||||
|
class BFTSmartUniquenessProvider(val myAddress: HostAndPort, val clusterAddresses: List<HostAndPort>, val db: Database) : UniquenessProvider {
|
||||||
|
// TODO: Write bft-smart host config file based on Corda node configuration.
|
||||||
|
// TODO: Define and document the configuration of the bft-smart cluster.
|
||||||
|
|
||||||
|
// TODO: Potentially update the bft-smart API for our use case or rebuild client and server from lower level building
|
||||||
|
// blocks bft-smart provides.
|
||||||
|
|
||||||
|
// TODO: Support cluster membership changes. This requires reading about reconfiguration of bft-smart clusters and
|
||||||
|
// perhaps a design doc. In general, it seems possible to use the state machine to reconfigure the cluster (reaching
|
||||||
|
// consensus about membership changes). Nodes that join the cluster for the first time or re-join can go through
|
||||||
|
// a "recovering" state and request missing data from their peers.
|
||||||
|
|
||||||
|
init {
|
||||||
|
require(myAddress in clusterAddresses) {
|
||||||
|
"expected myAddress '$myAddress' to be listed in clusterAddresses '$clusterAddresses'"
|
||||||
|
}
|
||||||
|
startServerThread()
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
private val log = loggerFor<BFTSmartUniquenessProvider>()
|
||||||
|
}
|
||||||
|
|
||||||
|
private val bftClient = BFTSmartClient<StateRef, UniquenessProvider.ConsumingTx>(clientId())
|
||||||
|
|
||||||
|
/** Throws UniquenessException if conflict is detected */
|
||||||
|
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||||
|
val entries = states.mapIndexed { i, stateRef ->
|
||||||
|
stateRef to UniquenessProvider.ConsumingTx(txId, i, callerIdentity)
|
||||||
|
}.toMap()
|
||||||
|
val conflicts = bftClient.put(entries)
|
||||||
|
if (conflicts.isNotEmpty()) {
|
||||||
|
throw UniquenessException(UniquenessProvider.Conflict(conflicts))
|
||||||
|
}
|
||||||
|
log.debug("All input states of transaction $txId have been committed")
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun serverId(): Int {
|
||||||
|
return clusterAddresses.indexOf(myAddress)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun clientId(): Int {
|
||||||
|
// 10k IDs are reserved for servers.
|
||||||
|
require(clusterAddresses.size <= 10000)
|
||||||
|
return 10000 + serverId()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun startServerThread() {
|
||||||
|
val id = serverId()
|
||||||
|
thread(name="BFTSmartServer-$id", isDaemon=true) {
|
||||||
|
BFTSmartServer<StateRef, UniquenessProvider.ConsumingTx>(id, db, "bft_smart_notary_committed_states")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,26 @@
|
|||||||
|
package net.corda.node.services.transactions
|
||||||
|
|
||||||
|
import net.corda.core.crypto.Party
|
||||||
|
import net.corda.core.node.services.TimestampChecker
|
||||||
|
import net.corda.flows.ValidatingNotaryFlow
|
||||||
|
import net.corda.node.services.api.ServiceHubInternal
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A validating notary service operated by a group of parties that don't necessarily trust each other.
|
||||||
|
*
|
||||||
|
* To validate a transaction, this service collects proofs that the transaction has been validated and committed by a
|
||||||
|
* specified number of notary nodes.
|
||||||
|
*
|
||||||
|
* Based on the [bft-smart library](https://github.com/bft-smart/library).
|
||||||
|
*/
|
||||||
|
class BFTValidatingNotaryService(services: ServiceHubInternal,
|
||||||
|
val timestampChecker: TimestampChecker,
|
||||||
|
val uniquenessProvider: BFTSmartUniquenessProvider) : NotaryService(services) {
|
||||||
|
companion object {
|
||||||
|
val type = ValidatingNotaryService.type.getSubType("bft")
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun createFlow(otherParty: Party): ValidatingNotaryFlow {
|
||||||
|
return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,117 @@
|
|||||||
|
package net.corda.node.services.transactions
|
||||||
|
|
||||||
|
import bftsmart.tom.ServiceProxy
|
||||||
|
import bftsmart.tom.MessageContext
|
||||||
|
import bftsmart.tom.ServiceReplica
|
||||||
|
import bftsmart.tom.server.defaultservices.DefaultRecoverable
|
||||||
|
import bftsmart.tom.server.defaultservices.DefaultReplier
|
||||||
|
import net.corda.core.serialization.deserialize
|
||||||
|
import net.corda.core.serialization.serialize
|
||||||
|
import net.corda.node.utilities.JDBCHashMap
|
||||||
|
import net.corda.node.utilities.databaseTransaction
|
||||||
|
import org.jetbrains.exposed.sql.Database
|
||||||
|
import java.util.LinkedHashMap
|
||||||
|
|
||||||
|
enum class RequestType {
|
||||||
|
Get,
|
||||||
|
Put
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Sent from [BFTSmartClient] to [BFTSmartServer] */
|
||||||
|
data class Request(val type: RequestType, val data: Any)
|
||||||
|
|
||||||
|
class BFTSmartClient<K: Any, V: Any>(id: Int) {
|
||||||
|
|
||||||
|
val clientProxy = ServiceProxy(id, "bft-smart-config")
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns conflicts as a map of conflicting keys and their stored values or an empty map if all entries are
|
||||||
|
* committed without conflicts.
|
||||||
|
*/
|
||||||
|
fun put(entries: Map<K, V>): Map<K, V> {
|
||||||
|
val request = Request(RequestType.Put, entries)
|
||||||
|
val responseBytes = clientProxy.invokeOrdered(request.serialize().bytes)
|
||||||
|
return responseBytes.deserialize<Map<K, V>>()
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the value associated with the key or null if no value is stored under the key. */
|
||||||
|
fun get(key: K): V? {
|
||||||
|
val request = Request(RequestType.Get, key)
|
||||||
|
val responseBytes = clientProxy.invokeUnordered(request.serialize().bytes) ?: return null
|
||||||
|
return responseBytes.deserialize<V>()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class BFTSmartServer<K: Any, V: Any>(val id: Int, val db: Database, tableName: String) : DefaultRecoverable() {
|
||||||
|
// TODO: Exception handling when processing client input.
|
||||||
|
|
||||||
|
// TODO: Use Requery with proper DB schema instead of JDBCHashMap.
|
||||||
|
val table = databaseTransaction(db) { JDBCHashMap<K, V>(tableName) }
|
||||||
|
|
||||||
|
// TODO: Looks like this statement is blocking. Investigate the bft-smart node startup.
|
||||||
|
val replica = ServiceReplica(id, "bft-smart-config", this, this, null, DefaultReplier())
|
||||||
|
|
||||||
|
@Suppress("UNUSED_PARAMETER")
|
||||||
|
override fun appExecuteUnordered(command: ByteArray, msgCtx: MessageContext): ByteArray? {
|
||||||
|
// TODO: collect signatures from msgCtx
|
||||||
|
val request = command.deserialize<Request>()
|
||||||
|
when (request.type) {
|
||||||
|
RequestType.Get -> {
|
||||||
|
val v = table[request.data] ?: return null
|
||||||
|
return v.serialize().bytes
|
||||||
|
}
|
||||||
|
else -> {
|
||||||
|
throw Exception("Unhandled request type: ${request.type}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun appExecuteBatch(command: Array<ByteArray>, mcs: Array<MessageContext>): Array<ByteArray?> {
|
||||||
|
val replies = command.zip(mcs) { c, m ->
|
||||||
|
executeSingle(c, m)
|
||||||
|
}
|
||||||
|
return replies.toTypedArray()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Suppress("UNUSED_PARAMETER")
|
||||||
|
private fun executeSingle(command: ByteArray, msgCtx: MessageContext): ByteArray? {
|
||||||
|
// TODO: collect signatures from msgCtx
|
||||||
|
val request = command.deserialize<Request>()
|
||||||
|
val conflicts = mutableMapOf<K, V>()
|
||||||
|
when (request.type) {
|
||||||
|
RequestType.Put -> {
|
||||||
|
@Suppress("UNCHECKED_CAST")
|
||||||
|
val m = request.data as Map<K, V>
|
||||||
|
databaseTransaction(db) {
|
||||||
|
for (k in m.keys) table[k]?.let { conflicts[k] = it }
|
||||||
|
if (conflicts.isEmpty()) table.putAll(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else -> {
|
||||||
|
throw Exception("Unhandled request type: ${request.type}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return conflicts.serialize().bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO:
|
||||||
|
// - Test snapshot functionality with different bft-smart cluster configurations.
|
||||||
|
// - Add streaming to support large data sets.
|
||||||
|
override fun getSnapshot(): ByteArray {
|
||||||
|
// LinkedHashMap for deterministic serialisation
|
||||||
|
// TODO: Simply use an array of pairs.
|
||||||
|
val m = LinkedHashMap<K, V>()
|
||||||
|
databaseTransaction(db) {
|
||||||
|
table.forEach { m[it.key] = it.value }
|
||||||
|
}
|
||||||
|
return m.serialize().bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun installSnapshot(bytes: ByteArray) {
|
||||||
|
val m = bytes.deserialize<LinkedHashMap<K, V>>()
|
||||||
|
databaseTransaction(db) {
|
||||||
|
table.clear()
|
||||||
|
table.putAll(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,70 @@
|
|||||||
|
package net.corda.node.services
|
||||||
|
|
||||||
|
import net.corda.node.services.transactions.BFTSmartClient
|
||||||
|
import net.corda.node.services.transactions.BFTSmartServer
|
||||||
|
import net.corda.node.utilities.configureDatabase
|
||||||
|
import net.corda.testing.node.makeTestDataSourceProperties
|
||||||
|
import org.jetbrains.exposed.sql.Database
|
||||||
|
import org.jetbrains.exposed.sql.Transaction
|
||||||
|
import org.junit.After
|
||||||
|
import org.junit.Before
|
||||||
|
import org.junit.Test
|
||||||
|
import java.io.Closeable
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
import kotlin.concurrent.thread
|
||||||
|
|
||||||
|
class DistributedImmutableBFTMapTests {
|
||||||
|
|
||||||
|
// TODO: Setup Corda cluster instead of starting server threads (see DistributedImmutableMapTests).
|
||||||
|
|
||||||
|
lateinit var dataSource: Closeable
|
||||||
|
lateinit var database: Database
|
||||||
|
|
||||||
|
@Before
|
||||||
|
fun setup() {
|
||||||
|
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||||
|
dataSource = dataSourceAndDatabase.first
|
||||||
|
database = dataSourceAndDatabase.second
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
fun tearDown() {
|
||||||
|
dataSource.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `stores entries correctly and detects conflicts`() {
|
||||||
|
val threads = (0..3).map { i ->
|
||||||
|
thread { BFTSmartServer<String, String>(i, database, "bft_notary_committed_states_$i") }.apply { Thread.sleep(500) }
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(3000) // TODO: Get notified when the servers are ready.
|
||||||
|
|
||||||
|
val client = BFTSmartClient<String, String>(1001)
|
||||||
|
|
||||||
|
val m = mapOf("a" to "b", "c" to "d", "e" to "f")
|
||||||
|
|
||||||
|
val conflicts = client.put(m)
|
||||||
|
assertEquals(mapOf(), conflicts)
|
||||||
|
|
||||||
|
for ((k, v) in m) {
|
||||||
|
val r = client.get(k)
|
||||||
|
assertEquals(v, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
val conflicts2 = client.put(mapOf("a" to "b2"))
|
||||||
|
assertEquals(mapOf("a" to "b"), conflicts2)
|
||||||
|
|
||||||
|
// Values are not mutated.
|
||||||
|
for ((k, v) in m) {
|
||||||
|
val r = client.get(k)
|
||||||
|
assertEquals(v, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Null response encodes 'not found'.
|
||||||
|
val r = client.get("x")
|
||||||
|
assertEquals(null, r)
|
||||||
|
|
||||||
|
threads.forEach { t -> t.stop() }
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user