mirror of
https://github.com/corda/corda.git
synced 2025-01-18 18:56:28 +00:00
Merged in andrius-distributed-notary-2 (pull request #480)
This commit is contained in:
commit
3110a9bb64
@ -7,6 +7,8 @@ import net.corda.core.crypto.SecureHash
|
||||
/**
|
||||
* A service that records input states of the given transaction and provides conflict information
|
||||
* if any of the inputs have already been used in another transaction.
|
||||
*
|
||||
* A uniqueness provider is expected to be used from within the context of a flow.
|
||||
*/
|
||||
interface UniquenessProvider {
|
||||
/** Commits all input states of the given transaction */
|
||||
|
@ -96,6 +96,10 @@ Configuration File Fields
|
||||
|
||||
:extraAdvertisedServiceIds: A list of ServiceType id strings to be advertised to the NetworkMapService and thus be available when other nodes query the NetworkMapCache for supporting nodes. This can also include plugin services loaded from .jar files in the plugins folder.
|
||||
|
||||
:notaryNodeAddress: The host and port to which to bind the embedded Raft server. Required only when running a distributed notary service. A group of Corda nodes can run a distributed notary service by each running an embedded Raft server and joining them to the same cluster to replicate the committed state log. Note that the Raft cluster uses a separate transport layer for communication that does not integrate with ArtemisMQ messaging services.
|
||||
|
||||
:notaryClusterAddresses: List of Raft cluster member addresses used to joining the cluster. At least one of the specified members must be active and be able to communicate with the cluster leader for joining. If empty, a new cluster will be bootstrapped. Required only when running a distributed notary service.
|
||||
|
||||
:networkMapAddress: If `null`, or missing the node is declaring itself as the NetworkMapService host. Otherwise the configuration value is the remote HostAndPort string for the ArtemisMQ service on the hosting node.
|
||||
|
||||
:useHTTPS: If false the node's web server will be plain HTTP. If true the node will use the same certificate and private key from the ``<workspace>/certificates/sslkeystore.jks`` file as the ArtemisMQ port for HTTPS. If HTTPS is enabled then unencrypted HTTP traffic to the node's **webAddress** port is not supported.
|
||||
|
@ -145,6 +145,9 @@ dependencies {
|
||||
// Capsule is a library for building independently executable fat JARs.
|
||||
compile 'co.paralleluniverse:capsule:1.0.3'
|
||||
|
||||
// Java Atomix: RAFT library
|
||||
compile 'io.atomix:atomix-all:1.0.0-rc4'
|
||||
|
||||
// Integration test helpers
|
||||
integrationTestCompile 'junit:junit:4.12'
|
||||
|
||||
|
@ -38,9 +38,7 @@ import net.corda.node.services.persistence.*
|
||||
import net.corda.node.services.schema.HibernateObserver
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.transactions.NotaryService
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.node.services.transactions.ValidatingNotaryService
|
||||
import net.corda.node.services.transactions.*
|
||||
import net.corda.node.services.vault.CashBalanceAsMetricsObserver
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.utilities.*
|
||||
@ -214,7 +212,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
|
||||
// TODO: uniquenessProvider creation should be inside makeNotaryService(), but notary service initialisation
|
||||
// depends on smm, while smm depends on tokenizableServices, which uniquenessProvider is part of
|
||||
advertisedServices.singleOrNull { it.type.isNotary() }?.let {
|
||||
uniquenessProvider = makeUniquenessProvider()
|
||||
uniquenessProvider = makeUniquenessProvider(it.type)
|
||||
tokenizableServices.add(uniquenessProvider!!)
|
||||
}
|
||||
|
||||
@ -422,13 +420,14 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
|
||||
return when (type) {
|
||||
SimpleNotaryService.type -> SimpleNotaryService(services, timestampChecker, uniquenessProvider!!)
|
||||
ValidatingNotaryService.type -> ValidatingNotaryService(services, timestampChecker, uniquenessProvider!!)
|
||||
RaftValidatingNotaryService.type -> RaftValidatingNotaryService(services, timestampChecker, uniquenessProvider!! as RaftUniquenessProvider)
|
||||
else -> {
|
||||
throw IllegalArgumentException("Notary type ${type.id} is not handled by makeNotaryService.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract fun makeUniquenessProvider(): UniquenessProvider
|
||||
protected abstract fun makeUniquenessProvider(type: ServiceType): UniquenessProvider
|
||||
|
||||
protected open fun makeIdentityService(): IdentityService {
|
||||
val service = InMemoryIdentityService()
|
||||
|
@ -5,6 +5,8 @@ import net.corda.core.div
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.node.services.ServiceType
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.then
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.serialization.NodeClock
|
||||
@ -16,6 +18,8 @@ import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.messaging.RPCOps
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.servlets.AttachmentDownloadServlet
|
||||
import net.corda.node.servlets.Config
|
||||
import net.corda.node.servlets.DataUploadServlet
|
||||
@ -259,7 +263,14 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
|
||||
}
|
||||
}
|
||||
|
||||
override fun makeUniquenessProvider() = PersistentUniquenessProvider()
|
||||
override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider {
|
||||
return when (type) {
|
||||
RaftValidatingNotaryService.type -> with(configuration) {
|
||||
RaftUniquenessProvider(basedir, notaryNodeAddress!!, notaryClusterAddresses, database, configuration)
|
||||
}
|
||||
else -> PersistentUniquenessProvider()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the node is persisting to an embedded H2 database, then expose this via TCP with a JDBC URL of the form:
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.node.services.config
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.typesafe.config.Config
|
||||
import net.corda.core.div
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
@ -9,7 +10,6 @@ import net.corda.node.serialization.NodeClock
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.utilities.TestClock
|
||||
import com.typesafe.config.Config
|
||||
import java.nio.file.Path
|
||||
import java.util.*
|
||||
|
||||
@ -49,6 +49,8 @@ class FullNodeConfiguration(val config: Config) : NodeConfiguration {
|
||||
val messagingServerAddress: HostAndPort? by config.getOrElse { null }
|
||||
val extraAdvertisedServiceIds: String by config
|
||||
val useTestClock: Boolean by config.getOrElse { false }
|
||||
val notaryNodeAddress: HostAndPort? by config.getOrElse { null }
|
||||
val notaryClusterAddresses: List<HostAndPort> = config.getListOrElse<String>("notaryClusterAddresses") { emptyList<String>() }.map { HostAndPort.fromString(it) }
|
||||
|
||||
fun createNode(): Node {
|
||||
// This is a sanity feature do not remove.
|
||||
|
@ -0,0 +1,107 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import io.atomix.copycat.Command
|
||||
import io.atomix.copycat.Query
|
||||
import io.atomix.copycat.server.Commit
|
||||
import io.atomix.copycat.server.Snapshottable
|
||||
import io.atomix.copycat.server.StateMachine
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotReader
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.utilities.JDBCHashMap
|
||||
import net.corda.node.utilities.databaseTransaction
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* A distributed map state machine that doesn't allow overriding values. The state machine is replicated
|
||||
* across a Copycat Raft cluster.
|
||||
*
|
||||
* The map contents are backed by a JDBC table. State re-synchronisation is achieved by periodically persisting snapshots
|
||||
* to disk, and sharing them across the cluster. A new node joining the cluster will have to obtain and install a snapshot
|
||||
* containing the entire JDBC table contents.
|
||||
*/
|
||||
class DistributedImmutableMap<K : Any, V : Any>(val db: Database, tableName: String = DEFAULT_TABLE_NAME) : StateMachine(), Snapshottable {
|
||||
companion object {
|
||||
private val log = loggerFor<DistributedImmutableMap<*, *>>()
|
||||
private val DEFAULT_TABLE_NAME = "committed_states"
|
||||
}
|
||||
|
||||
object Commands {
|
||||
class PutAll<K, V>(val entries: Map<K, V>) : Command<Map<K, V>> {
|
||||
override fun compaction(): Command.CompactionMode {
|
||||
// The SNAPSHOT compaction mode indicates that a command can be removed from the Raft log once
|
||||
// a snapshot of the state machine has been written to disk
|
||||
return Command.CompactionMode.SNAPSHOT
|
||||
}
|
||||
}
|
||||
|
||||
class Size : Query<Int>
|
||||
class Get<out K, V>(val key: K) : Query<V?>
|
||||
}
|
||||
|
||||
private val map = databaseTransaction(db) { JDBCHashMap<K, V>(tableName) }
|
||||
|
||||
/** Gets a value for the given [Commands.Get.key] */
|
||||
fun get(commit: Commit<Commands.Get<K, V>>): V? {
|
||||
try {
|
||||
val key = commit.operation().key
|
||||
return databaseTransaction(db) { map[key] }
|
||||
} finally {
|
||||
commit.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the given [Commands.PutAll.entries] if no entry key already exists.
|
||||
*
|
||||
* @return map containing conflicting entries
|
||||
*/
|
||||
fun put(commit: Commit<Commands.PutAll<K, V>>): Map<K, V> {
|
||||
try {
|
||||
val conflicts = LinkedHashMap<K, V>()
|
||||
databaseTransaction(db) {
|
||||
val entries = commit.operation().entries
|
||||
log.debug("State machine commit: storing entries with keys (${entries.keys.joinToString()})")
|
||||
for (key in entries.keys) map[key]?.let { conflicts[key] = it }
|
||||
if (conflicts.isEmpty()) map.putAll(entries)
|
||||
}
|
||||
return conflicts
|
||||
} finally {
|
||||
commit.close()
|
||||
}
|
||||
}
|
||||
|
||||
fun size(commit: Commit<Commands.Size>): Int {
|
||||
try {
|
||||
return databaseTransaction(db) { map.size }
|
||||
} finally {
|
||||
commit.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes out all [map] entries to disk. Note that this operation does not load all entries into memory, as the
|
||||
* [SnapshotWriter] is using a disk-backed buffer internally, and iterating map entries results in only a
|
||||
* fixed number of recently accessed entries to ever be kept in memory.
|
||||
*/
|
||||
override fun snapshot(writer: SnapshotWriter) {
|
||||
databaseTransaction(db) {
|
||||
writer.writeInt(map.size)
|
||||
map.entries.forEach { writer.writeObject(it.key to it.value) }
|
||||
}
|
||||
}
|
||||
|
||||
/** Reads entries from disk and adds them to [map]. */
|
||||
override fun install(reader: SnapshotReader) {
|
||||
val size = reader.readInt()
|
||||
databaseTransaction(db) {
|
||||
map.clear()
|
||||
// TODO: read & put entries in batches
|
||||
for (i in 1..size) {
|
||||
val (key, value) = reader.readObject<Pair<K, V>>()
|
||||
map.put(key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,127 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import io.atomix.catalyst.transport.Address
|
||||
import io.atomix.catalyst.transport.NettyTransport
|
||||
import io.atomix.catalyst.transport.SslProtocol
|
||||
import io.atomix.catalyst.transport.Transport
|
||||
import io.atomix.copycat.client.CopycatClient
|
||||
import io.atomix.copycat.server.CopycatServer
|
||||
import io.atomix.copycat.server.storage.Storage
|
||||
import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.services.UniquenessException
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.config.NodeSSLConfiguration
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import java.nio.file.Path
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* A uniqueness provider that records committed input states in a distributed collection replicated and
|
||||
* persisted in a Raft cluster, using the Copycat framework (http://atomix.io/copycat/).
|
||||
*
|
||||
* The uniqueness provider maintains both a Copycat cluster node (server) and a client through which it can submit
|
||||
* requests to the cluster. In Copycat, a client request is first sent to the server it's connected to and then redirected
|
||||
* to the cluster leader to be actioned.
|
||||
*
|
||||
* @param storagePath Directory storing the Raft log and state machine snapshots
|
||||
* @param myAddress Address of the Copycat node run by this Corda node
|
||||
* @param clusterAddresses List of node addresses in the existing Copycat cluster. At least one active node must be
|
||||
* provided to join the cluster. If empty, a new cluster will be bootstrapped.
|
||||
* @param db The database to store the state machine state in
|
||||
* @param config SSL configuration
|
||||
*/
|
||||
@ThreadSafe
|
||||
class RaftUniquenessProvider(storagePath: Path, myAddress: HostAndPort, clusterAddresses: List<HostAndPort>,
|
||||
db: Database, config: NodeSSLConfiguration) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = loggerFor<RaftUniquenessProvider>()
|
||||
}
|
||||
|
||||
private val _clientFuture: CompletableFuture<CopycatClient>
|
||||
/**
|
||||
* Copycat clients are responsible for connecting to the cluster and submitting commands and queries that operate
|
||||
* on the cluster's replicated state machine.
|
||||
*/
|
||||
private val client: CopycatClient
|
||||
get() = _clientFuture.get()
|
||||
|
||||
init {
|
||||
log.info("Creating Copycat server, log stored in: ${storagePath.toFile()}")
|
||||
val stateMachineFactory = { DistributedImmutableMap<String, ByteArray>(db) }
|
||||
val address = Address(myAddress.hostText, myAddress.port)
|
||||
val storage = buildStorage(storagePath)
|
||||
val transport = buildTransport(config)
|
||||
|
||||
val server = CopycatServer.builder(address)
|
||||
.withStateMachine(stateMachineFactory)
|
||||
.withStorage(storage)
|
||||
.withServerTransport(transport)
|
||||
.build()
|
||||
|
||||
val serverFuture = if (clusterAddresses.isNotEmpty()) {
|
||||
log.info("Joining an existing Copycat cluster at $clusterAddresses")
|
||||
val cluster = clusterAddresses.map { Address(it.hostText, it.port) }
|
||||
server.join(cluster)
|
||||
} else {
|
||||
log.info("Bootstrapping a Copycat cluster at $address")
|
||||
server.bootstrap()
|
||||
}
|
||||
|
||||
val client = CopycatClient.builder(address)
|
||||
.withTransport(transport)
|
||||
.build()
|
||||
_clientFuture = serverFuture.thenCompose { client.connect(address) }
|
||||
}
|
||||
|
||||
private fun buildStorage(storagePath: Path): Storage? {
|
||||
return Storage.builder()
|
||||
.withDirectory(storagePath.toFile())
|
||||
.withStorageLevel(StorageLevel.DISK)
|
||||
.build()
|
||||
}
|
||||
|
||||
private fun buildTransport(config: NodeSSLConfiguration): Transport? {
|
||||
return NettyTransport.builder()
|
||||
.withSsl()
|
||||
.withSslProtocol(SslProtocol.TLSv1_2)
|
||||
.withKeyStorePath(config.keyStorePath.toString())
|
||||
.withKeyStorePassword(config.keyStorePassword)
|
||||
.withTrustStorePath(config.trustStorePath.toString())
|
||||
.withTrustStorePassword(config.trustStorePassword)
|
||||
.build()
|
||||
}
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
val entries = states.mapIndexed { i, stateRef -> stateRef to UniquenessProvider.ConsumingTx(txId, i, callerIdentity) }
|
||||
|
||||
log.debug("Attempting to commit input states: ${states.joinToString()}")
|
||||
val commitCommand = DistributedImmutableMap.Commands.PutAll(encode(entries))
|
||||
val conflicts = client.submit(commitCommand).get()
|
||||
|
||||
if (conflicts.isNotEmpty()) throw UniquenessException(UniquenessProvider.Conflict(decode(conflicts)))
|
||||
log.debug("All input states of transaction $txId have been committed")
|
||||
}
|
||||
|
||||
/**
|
||||
* Copycat uses its own serialization framework so we convert and store entries as String -> ByteArray
|
||||
* here to avoid having to define additional serializers for our custom types.
|
||||
*/
|
||||
private fun encode(items: List<Pair<StateRef, UniquenessProvider.ConsumingTx>>): Map<String, ByteArray> {
|
||||
fun StateRef.encoded() = "$txhash:$index"
|
||||
return items.map { it.first.encoded() to it.second.serialize().bits }.toMap()
|
||||
}
|
||||
|
||||
private fun decode(items: Map<String, ByteArray>): Map<StateRef, UniquenessProvider.ConsumingTx> {
|
||||
fun String.toStateRef() = split(":").let { StateRef(SecureHash.parse(it[0]), it[1].toInt()) }
|
||||
return items.map { it.key.toStateRef() to it.value.deserialize<UniquenessProvider.ConsumingTx>() }.toMap()
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.node.services.TimestampChecker
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.protocols.ValidatingNotaryProtocol
|
||||
|
||||
/** A validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
|
||||
class RaftValidatingNotaryService(services: ServiceHubInternal,
|
||||
val timestampChecker: TimestampChecker,
|
||||
val uniquenessProvider: RaftUniquenessProvider) : NotaryService(services) {
|
||||
companion object {
|
||||
val type = ValidatingNotaryService.type.getSubType("raft")
|
||||
}
|
||||
|
||||
override fun createProtocol(otherParty: Party): ValidatingNotaryProtocol {
|
||||
return ValidatingNotaryProtocol(otherParty, timestampChecker, uniquenessProvider)
|
||||
}
|
||||
}
|
@ -0,0 +1,110 @@
|
||||
package net.corda.node.services
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import io.atomix.catalyst.transport.Address
|
||||
import io.atomix.copycat.client.CopycatClient
|
||||
import io.atomix.copycat.server.CopycatServer
|
||||
import io.atomix.copycat.server.storage.Storage
|
||||
import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.utilities.LogHelper
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.DistributedImmutableMap
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.testing.freeLocalHostAndPort
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class DistributedImmutableMapTests {
|
||||
data class Member(val client: CopycatClient, val server: CopycatServer)
|
||||
|
||||
lateinit var cluster: List<Member>
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var transaction: Transaction
|
||||
lateinit var database: Database
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
LogHelper.setLevel("-org.apache.activemq")
|
||||
LogHelper.setLevel(NetworkMapService::class)
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
cluster = setUpCluster()
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
LogHelper.reset("org.apache.activemq")
|
||||
LogHelper.reset(NetworkMapService::class)
|
||||
cluster.forEach {
|
||||
it.client.close()
|
||||
it.server.shutdown()
|
||||
}
|
||||
dataSource.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `stores entries correctly`() {
|
||||
val client = cluster.last().client
|
||||
|
||||
val entries = mapOf("key1" to "value1", "key2" to "value2")
|
||||
|
||||
val conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).get()
|
||||
assertTrue { conflict.isEmpty() }
|
||||
|
||||
val value1 = client.submit(DistributedImmutableMap.Commands.Get<String, String>("key1"))
|
||||
val value2 = client.submit(DistributedImmutableMap.Commands.Get<String, String>("key2"))
|
||||
|
||||
assertEquals(value1.get(), "value1")
|
||||
assertEquals(value2.get(), "value2")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `returns conflict for duplicate entries`() {
|
||||
val client = cluster.last().client
|
||||
|
||||
val entries = mapOf("key1" to "value1", "key2" to "value2")
|
||||
|
||||
var conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).get()
|
||||
assertTrue { conflict.isEmpty() }
|
||||
conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).get()
|
||||
assertTrue { conflict == entries }
|
||||
}
|
||||
|
||||
private fun setUpCluster(nodeCount: Int = 3): List<Member> {
|
||||
val clusterAddress = freeLocalHostAndPort()
|
||||
val cluster = mutableListOf(createReplica(clusterAddress))
|
||||
for (i in 1..nodeCount) cluster.add(createReplica(freeLocalHostAndPort(), clusterAddress))
|
||||
return cluster.map { it.get() }
|
||||
}
|
||||
|
||||
private fun createReplica(myAddress: HostAndPort, clusterAddress: HostAndPort? = null): CompletableFuture<Member> {
|
||||
val storage = Storage.builder().withStorageLevel(StorageLevel.MEMORY).build()
|
||||
val address = Address(myAddress.hostText, myAddress.port)
|
||||
|
||||
val stateMachineFactory = { DistributedImmutableMap<String, ByteArray>(database, "commited_states_${myAddress.port}") }
|
||||
|
||||
val server = CopycatServer.builder(address)
|
||||
.withStateMachine(stateMachineFactory)
|
||||
.withStorage(storage)
|
||||
.build()
|
||||
|
||||
val serverInitFuture = if (clusterAddress != null) {
|
||||
val cluster = Address(clusterAddress.hostText, clusterAddress.port)
|
||||
server.join(cluster)
|
||||
} else {
|
||||
server.bootstrap()
|
||||
}
|
||||
|
||||
val client = CopycatClient.builder(address).build()
|
||||
return serverInitFuture.thenCompose { client.connect(address) }.thenApply { Member(it, server) }
|
||||
}
|
||||
}
|
@ -0,0 +1,173 @@
|
||||
package net.corda.node.services
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import net.corda.core.contracts.DummyContract
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionType
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.PublicKeyTree
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.crypto.tree
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.random63BitValue
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.LogHelper
|
||||
import net.corda.node.internal.AbstractNode
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.services.config.ConfigHelper
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.utilities.databaseTransaction
|
||||
import net.corda.protocols.NotaryError
|
||||
import net.corda.protocols.NotaryException
|
||||
import net.corda.protocols.NotaryProtocol
|
||||
import net.corda.testing.freeLocalHostAndPort
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.File
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.security.KeyPair
|
||||
import java.util.*
|
||||
import java.util.concurrent.ExecutionException
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
// TODO: clean up and rewrite this using DriverDSL
|
||||
class DistributedNotaryTests {
|
||||
val baseDir = "build/notaryTest/${Date()}"
|
||||
val notaryName = "Notary Service"
|
||||
val clusterSize = 3
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
LogHelper.setLevel("-org.apache.activemq")
|
||||
LogHelper.setLevel(NetworkMapService::class)
|
||||
File(baseDir).mkdirs()
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
LogHelper.reset("org.apache.activemq")
|
||||
LogHelper.reset(NetworkMapService::class)
|
||||
File(baseDir).deleteRecursively()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should detect double spend`() {
|
||||
val masterNode = createNotaryCluster()
|
||||
val alice = createAliceNode(masterNode.net.myAddress)
|
||||
|
||||
val notaryParty = alice.netMapCache.getAnyNotary(RaftValidatingNotaryService.type)!!
|
||||
|
||||
val stx = run {
|
||||
val notaryNodeKeyPair = databaseTransaction(masterNode.database) { masterNode.services.notaryIdentityKey }
|
||||
val inputState = issueState(alice, notaryParty, notaryNodeKeyPair)
|
||||
val tx = TransactionType.General.Builder(notaryParty).withItems(inputState)
|
||||
val aliceKey = databaseTransaction(alice.database) { alice.services.legalIdentityKey }
|
||||
tx.signWith(aliceKey)
|
||||
tx.toSignedTransaction(false)
|
||||
}
|
||||
|
||||
val buildProtocol = { NotaryProtocol.Client(stx) }
|
||||
|
||||
val firstSpend = alice.services.startProtocol(buildProtocol())
|
||||
firstSpend.resultFuture.get()
|
||||
|
||||
val secondSpend = alice.services.startProtocol(buildProtocol())
|
||||
|
||||
val ex = assertFailsWith(ExecutionException::class) { secondSpend.resultFuture.get() }
|
||||
val error = (ex.cause as NotaryException).error as NotaryError.Conflict
|
||||
assertEquals(error.tx, stx.tx)
|
||||
}
|
||||
|
||||
private fun createNotaryCluster(): Node {
|
||||
val notaryClusterAddress = freeLocalHostAndPort()
|
||||
val keyPairs = (1..clusterSize).map { generateKeyPair() }
|
||||
|
||||
val notaryKeyTree = PublicKeyTree.Builder().addKeys(keyPairs.map { it.public.tree }).build(1)
|
||||
val notaryParty = Party(notaryName, notaryKeyTree).serialize()
|
||||
|
||||
var networkMapAddress: SingleMessageRecipient? = null
|
||||
|
||||
val cluster = keyPairs.mapIndexed { i, keyPair ->
|
||||
val dir = Paths.get(baseDir, "notaryNode$i")
|
||||
Files.createDirectories(dir)
|
||||
|
||||
val privateKeyFile = RaftValidatingNotaryService.type.id + "-private-key"
|
||||
val publicKeyFile = RaftValidatingNotaryService.type.id + "-public"
|
||||
|
||||
notaryParty.writeToFile(dir.resolve(publicKeyFile))
|
||||
keyPair.serialize().writeToFile(dir.resolve(privateKeyFile))
|
||||
|
||||
val node: Node
|
||||
if (networkMapAddress == null) {
|
||||
val config = generateConfig(dir, "node" + random63BitValue(), notaryClusterAddress)
|
||||
node = createNotaryNode(config)
|
||||
networkMapAddress = node.net.myAddress
|
||||
} else {
|
||||
val config = generateConfig(dir, "node" + random63BitValue(), freeLocalHostAndPort(), notaryClusterAddress)
|
||||
node = createNotaryNode(config, networkMapAddress)
|
||||
}
|
||||
|
||||
node
|
||||
}
|
||||
|
||||
return cluster.first()
|
||||
}
|
||||
|
||||
private fun createNotaryNode(config: FullNodeConfiguration, networkMapAddress: SingleMessageRecipient? = null): Node {
|
||||
val extraAdvertisedServices = if (networkMapAddress == null) setOf(ServiceInfo(NetworkMapService.type, "NMS")) else emptySet<ServiceInfo>()
|
||||
|
||||
val notaryNode = Node(
|
||||
configuration = config,
|
||||
advertisedServices = extraAdvertisedServices + ServiceInfo(RaftValidatingNotaryService.type, notaryName),
|
||||
networkMapAddress = networkMapAddress)
|
||||
|
||||
notaryNode.setup().start()
|
||||
thread { notaryNode.run() }
|
||||
notaryNode.networkMapRegistrationFuture.get()
|
||||
return notaryNode
|
||||
}
|
||||
|
||||
private fun createAliceNode(networkMapAddress: SingleMessageRecipient): Node {
|
||||
val aliceDir = Paths.get(baseDir, "alice")
|
||||
val alice = Node(
|
||||
configuration = generateConfig(aliceDir, "Alice"),
|
||||
advertisedServices = setOf(),
|
||||
networkMapAddress = networkMapAddress)
|
||||
alice.setup().start()
|
||||
thread { alice.run() }
|
||||
alice.networkMapRegistrationFuture.get()
|
||||
return alice
|
||||
}
|
||||
|
||||
private fun issueState(node: AbstractNode, notary: Party, notaryKey: KeyPair): StateAndRef<*> {
|
||||
return databaseTransaction(node.database) {
|
||||
val tx = DummyContract.generateInitial(node.info.legalIdentity.ref(0), Random().nextInt(), notary)
|
||||
tx.signWith(node.services.legalIdentityKey)
|
||||
tx.signWith(notaryKey)
|
||||
val stx = tx.toSignedTransaction()
|
||||
node.services.recordTransactions(listOf(stx))
|
||||
StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0))
|
||||
}
|
||||
}
|
||||
|
||||
private fun generateConfig(dir: Path, name: String, notaryNodeAddress: HostAndPort? = null, notaryClusterAddress: HostAndPort? = null) = FullNodeConfiguration(
|
||||
ConfigHelper.loadConfig(dir,
|
||||
allowMissingConfig = true,
|
||||
configOverrides = mapOf(
|
||||
"myLegalName" to name,
|
||||
"basedir" to dir,
|
||||
"artemisAddress" to freeLocalHostAndPort().toString(),
|
||||
"webAddress" to freeLocalHostAndPort().toString(),
|
||||
"notaryNodeAddress" to notaryNodeAddress?.toString(),
|
||||
"notaryClusterAddresses" to (if (notaryClusterAddress == null) emptyList<String>() else listOf(notaryClusterAddress.toString()))
|
||||
)))
|
||||
}
|
@ -9,9 +9,7 @@ import net.corda.core.crypto.Party
|
||||
import net.corda.core.div
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.PhysicalLocation
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.random63BitValue
|
||||
import net.corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
import net.corda.core.utilities.loggerFor
|
||||
@ -147,7 +145,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
// There is no need to slow down the unit tests by initialising CityDatabase
|
||||
override fun findMyLocation(): PhysicalLocation? = null
|
||||
|
||||
override fun makeUniquenessProvider() = InMemoryUniquenessProvider()
|
||||
override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider = InMemoryUniquenessProvider()
|
||||
|
||||
override fun start(): MockNode {
|
||||
super.start()
|
||||
|
Loading…
Reference in New Issue
Block a user