mirror of
https://github.com/corda/corda.git
synced 2025-06-18 15:18:16 +00:00
Added a distributed notary that uses the Raft consensus algorithm (implemented by Copycat).
The notary operates by storing committed input states in a map state machine which is replicated across the Copycat cluster.
This commit is contained in:
@ -38,9 +38,7 @@ import net.corda.node.services.persistence.*
|
||||
import net.corda.node.services.schema.HibernateObserver
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.transactions.NotaryService
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.node.services.transactions.ValidatingNotaryService
|
||||
import net.corda.node.services.transactions.*
|
||||
import net.corda.node.services.vault.CashBalanceAsMetricsObserver
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.utilities.*
|
||||
@ -214,7 +212,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
|
||||
// TODO: uniquenessProvider creation should be inside makeNotaryService(), but notary service initialisation
|
||||
// depends on smm, while smm depends on tokenizableServices, which uniquenessProvider is part of
|
||||
advertisedServices.singleOrNull { it.type.isNotary() }?.let {
|
||||
uniquenessProvider = makeUniquenessProvider()
|
||||
uniquenessProvider = makeUniquenessProvider(it.type)
|
||||
tokenizableServices.add(uniquenessProvider!!)
|
||||
}
|
||||
|
||||
@ -422,13 +420,14 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
|
||||
return when (type) {
|
||||
SimpleNotaryService.type -> SimpleNotaryService(services, timestampChecker, uniquenessProvider!!)
|
||||
ValidatingNotaryService.type -> ValidatingNotaryService(services, timestampChecker, uniquenessProvider!!)
|
||||
RaftValidatingNotaryService.type -> RaftValidatingNotaryService(services, timestampChecker, uniquenessProvider!! as RaftUniquenessProvider)
|
||||
else -> {
|
||||
throw IllegalArgumentException("Notary type ${type.id} is not handled by makeNotaryService.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract fun makeUniquenessProvider(): UniquenessProvider
|
||||
protected abstract fun makeUniquenessProvider(type: ServiceType): UniquenessProvider
|
||||
|
||||
protected open fun makeIdentityService(): IdentityService {
|
||||
val service = InMemoryIdentityService()
|
||||
|
@ -5,6 +5,8 @@ import net.corda.core.div
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.node.services.ServiceType
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.then
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.serialization.NodeClock
|
||||
@ -16,6 +18,8 @@ import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.messaging.RPCOps
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.servlets.AttachmentDownloadServlet
|
||||
import net.corda.node.servlets.Config
|
||||
import net.corda.node.servlets.DataUploadServlet
|
||||
@ -259,7 +263,14 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
|
||||
}
|
||||
}
|
||||
|
||||
override fun makeUniquenessProvider() = PersistentUniquenessProvider()
|
||||
override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider {
|
||||
return when (type) {
|
||||
RaftValidatingNotaryService.type -> with(configuration) {
|
||||
RaftUniquenessProvider(basedir, notaryNodeAddress!!, notaryClusterAddresses, database, configuration)
|
||||
}
|
||||
else -> PersistentUniquenessProvider()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the node is persisting to an embedded H2 database, then expose this via TCP with a JDBC URL of the form:
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.node.services.config
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.typesafe.config.Config
|
||||
import net.corda.core.div
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
@ -9,7 +10,6 @@ import net.corda.node.serialization.NodeClock
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.utilities.TestClock
|
||||
import com.typesafe.config.Config
|
||||
import java.nio.file.Path
|
||||
import java.util.*
|
||||
|
||||
@ -49,6 +49,8 @@ class FullNodeConfiguration(val config: Config) : NodeConfiguration {
|
||||
val messagingServerAddress: HostAndPort? by config.getOrElse { null }
|
||||
val extraAdvertisedServiceIds: String by config
|
||||
val useTestClock: Boolean by config.getOrElse { false }
|
||||
val notaryNodeAddress: HostAndPort? by config.getOrElse { null }
|
||||
val notaryClusterAddresses: List<HostAndPort> = config.getListOrElse<String>("notaryClusterAddresses") { emptyList<String>() }.map { HostAndPort.fromString(it) }
|
||||
|
||||
fun createNode(): Node {
|
||||
// This is a sanity feature do not remove.
|
||||
|
@ -0,0 +1,107 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import io.atomix.copycat.Command
|
||||
import io.atomix.copycat.Query
|
||||
import io.atomix.copycat.server.Commit
|
||||
import io.atomix.copycat.server.Snapshottable
|
||||
import io.atomix.copycat.server.StateMachine
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotReader
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.utilities.JDBCHashMap
|
||||
import net.corda.node.utilities.databaseTransaction
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* A distributed map state machine that doesn't allow overriding values. The state machine is replicated
|
||||
* across a Copycat Raft cluster.
|
||||
*
|
||||
* The map contents are backed by a JDBC table. State re-synchronisation is achieved by periodically persisting snapshots
|
||||
* to disk, and sharing them across the cluster. A new node joining the cluster will have to obtain and install a snapshot
|
||||
* containing the entire JDBC table contents.
|
||||
*/
|
||||
class DistributedImmutableMap<K : Any, V : Any>(val db: Database, tableName: String = DEFAULT_TABLE_NAME) : StateMachine(), Snapshottable {
|
||||
companion object {
|
||||
private val log = loggerFor<DistributedImmutableMap<*, *>>()
|
||||
private val DEFAULT_TABLE_NAME = "committed_states"
|
||||
}
|
||||
|
||||
object Commands {
|
||||
class PutAll<K, V>(val entries: Map<K, V>) : Command<Map<K, V>> {
|
||||
override fun compaction(): Command.CompactionMode {
|
||||
// The SNAPSHOT compaction mode indicates that a command can be removed from the Raft log once
|
||||
// a snapshot of the state machine has been written to disk
|
||||
return Command.CompactionMode.SNAPSHOT
|
||||
}
|
||||
}
|
||||
|
||||
class Size : Query<Int>
|
||||
class Get<out K, V>(val key: K) : Query<V?>
|
||||
}
|
||||
|
||||
private val map = databaseTransaction(db) { JDBCHashMap<K, V>(tableName) }
|
||||
|
||||
/** Gets a value for the given [Commands.Get.key] */
|
||||
fun get(commit: Commit<Commands.Get<K, V>>): V? {
|
||||
try {
|
||||
val key = commit.operation().key
|
||||
return databaseTransaction(db) { map[key] }
|
||||
} finally {
|
||||
commit.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the given [Commands.PutAll.entries] if no entry key already exists.
|
||||
*
|
||||
* @return map containing conflicting entries
|
||||
*/
|
||||
fun put(commit: Commit<Commands.PutAll<K, V>>): Map<K, V> {
|
||||
try {
|
||||
val conflicts = LinkedHashMap<K, V>()
|
||||
databaseTransaction(db) {
|
||||
val entries = commit.operation().entries
|
||||
log.debug("State machine commit: storing entries with keys (${entries.keys.joinToString()})")
|
||||
for (key in entries.keys) map[key]?.let { conflicts[key] = it }
|
||||
if (conflicts.isEmpty()) map.putAll(entries)
|
||||
}
|
||||
return conflicts
|
||||
} finally {
|
||||
commit.close()
|
||||
}
|
||||
}
|
||||
|
||||
fun size(commit: Commit<Commands.Size>): Int {
|
||||
try {
|
||||
return databaseTransaction(db) { map.size }
|
||||
} finally {
|
||||
commit.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes out all [map] entries to disk. Note that this operation does not load all entries into memory, as the
|
||||
* [SnapshotWriter] is using a disk-backed buffer internally, and iterating map entries results in only a
|
||||
* fixed number of recently accessed entries to ever be kept in memory.
|
||||
*/
|
||||
override fun snapshot(writer: SnapshotWriter) {
|
||||
databaseTransaction(db) {
|
||||
writer.writeInt(map.size)
|
||||
map.entries.forEach { writer.writeObject(it.key to it.value) }
|
||||
}
|
||||
}
|
||||
|
||||
/** Reads entries from disk and adds them to [map]. */
|
||||
override fun install(reader: SnapshotReader) {
|
||||
val size = reader.readInt()
|
||||
databaseTransaction(db) {
|
||||
map.clear()
|
||||
// TODO: read & put entries in batches
|
||||
for (i in 1..size) {
|
||||
val (key, value) = reader.readObject<Pair<K, V>>()
|
||||
map.put(key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,127 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import io.atomix.catalyst.transport.Address
|
||||
import io.atomix.catalyst.transport.NettyTransport
|
||||
import io.atomix.catalyst.transport.SslProtocol
|
||||
import io.atomix.catalyst.transport.Transport
|
||||
import io.atomix.copycat.client.CopycatClient
|
||||
import io.atomix.copycat.server.CopycatServer
|
||||
import io.atomix.copycat.server.storage.Storage
|
||||
import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.services.UniquenessException
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.config.NodeSSLConfiguration
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import java.nio.file.Path
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* A uniqueness provider that records committed input states in a distributed collection replicated and
|
||||
* persisted in a Raft cluster, using the Copycat framework (http://atomix.io/copycat/).
|
||||
*
|
||||
* The uniqueness provider maintains both a Copycat cluster node (server) and a client through which it can submit
|
||||
* requests to the cluster. In Copycat, a client request is first sent to the server it's connected to and then redirected
|
||||
* to the cluster leader to be actioned.
|
||||
*
|
||||
* @param storagePath Directory storing the Raft log and state machine snapshots
|
||||
* @param myAddress Address of the Copycat node run by this Corda node
|
||||
* @param clusterAddresses List of node addresses in the existing Copycat cluster. At least one active node must be
|
||||
* provided to join the cluster. If empty, a new cluster will be bootstrapped.
|
||||
* @param db The database to store the state machine state in
|
||||
* @param config SSL configuration
|
||||
*/
|
||||
@ThreadSafe
|
||||
class RaftUniquenessProvider(storagePath: Path, myAddress: HostAndPort, clusterAddresses: List<HostAndPort>,
|
||||
db: Database, config: NodeSSLConfiguration) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = loggerFor<RaftUniquenessProvider>()
|
||||
}
|
||||
|
||||
private val _clientFuture: CompletableFuture<CopycatClient>
|
||||
/**
|
||||
* Copycat clients are responsible for connecting to the cluster and submitting commands and queries that operate
|
||||
* on the cluster's replicated state machine.
|
||||
*/
|
||||
private val client: CopycatClient
|
||||
get() = _clientFuture.get()
|
||||
|
||||
init {
|
||||
log.info("Creating Copycat server, log stored in: ${storagePath.toFile()}")
|
||||
val stateMachineFactory = { DistributedImmutableMap<String, ByteArray>(db) }
|
||||
val address = Address(myAddress.hostText, myAddress.port)
|
||||
val storage = buildStorage(storagePath)
|
||||
val transport = buildTransport(config)
|
||||
|
||||
val server = CopycatServer.builder(address)
|
||||
.withStateMachine(stateMachineFactory)
|
||||
.withStorage(storage)
|
||||
.withServerTransport(transport)
|
||||
.build()
|
||||
|
||||
val serverFuture = if (clusterAddresses.isNotEmpty()) {
|
||||
log.info("Joining an existing Copycat cluster at $clusterAddresses")
|
||||
val cluster = clusterAddresses.map { Address(it.hostText, it.port) }
|
||||
server.join(cluster)
|
||||
} else {
|
||||
log.info("Bootstrapping a Copycat cluster at $address")
|
||||
server.bootstrap()
|
||||
}
|
||||
|
||||
val client = CopycatClient.builder(address)
|
||||
.withTransport(transport)
|
||||
.build()
|
||||
_clientFuture = serverFuture.thenCompose { client.connect(address) }
|
||||
}
|
||||
|
||||
private fun buildStorage(storagePath: Path): Storage? {
|
||||
return Storage.builder()
|
||||
.withDirectory(storagePath.toFile())
|
||||
.withStorageLevel(StorageLevel.DISK)
|
||||
.build()
|
||||
}
|
||||
|
||||
private fun buildTransport(config: NodeSSLConfiguration): Transport? {
|
||||
return NettyTransport.builder()
|
||||
.withSsl()
|
||||
.withSslProtocol(SslProtocol.TLSv1_2)
|
||||
.withKeyStorePath(config.keyStorePath.toString())
|
||||
.withKeyStorePassword(config.keyStorePassword)
|
||||
.withTrustStorePath(config.trustStorePath.toString())
|
||||
.withTrustStorePassword(config.trustStorePassword)
|
||||
.build()
|
||||
}
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
val entries = states.mapIndexed { i, stateRef -> stateRef to UniquenessProvider.ConsumingTx(txId, i, callerIdentity) }
|
||||
|
||||
log.debug("Attempting to commit input states: ${states.joinToString()}")
|
||||
val commitCommand = DistributedImmutableMap.Commands.PutAll(encode(entries))
|
||||
val conflicts = client.submit(commitCommand).get()
|
||||
|
||||
if (conflicts.isNotEmpty()) throw UniquenessException(UniquenessProvider.Conflict(decode(conflicts)))
|
||||
log.debug("All input states of transaction $txId have been committed")
|
||||
}
|
||||
|
||||
/**
|
||||
* Copycat uses its own serialization framework so we convert and store entries as String -> ByteArray
|
||||
* here to avoid having to define additional serializers for our custom types.
|
||||
*/
|
||||
private fun encode(items: List<Pair<StateRef, UniquenessProvider.ConsumingTx>>): Map<String, ByteArray> {
|
||||
fun StateRef.encoded() = "$txhash:$index"
|
||||
return items.map { it.first.encoded() to it.second.serialize().bits }.toMap()
|
||||
}
|
||||
|
||||
private fun decode(items: Map<String, ByteArray>): Map<StateRef, UniquenessProvider.ConsumingTx> {
|
||||
fun String.toStateRef() = split(":").let { StateRef(SecureHash.parse(it[0]), it[1].toInt()) }
|
||||
return items.map { it.key.toStateRef() to it.value.deserialize<UniquenessProvider.ConsumingTx>() }.toMap()
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.node.services.TimestampChecker
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.protocols.ValidatingNotaryProtocol
|
||||
|
||||
/** A validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
|
||||
class RaftValidatingNotaryService(services: ServiceHubInternal,
|
||||
val timestampChecker: TimestampChecker,
|
||||
val uniquenessProvider: RaftUniquenessProvider) : NotaryService(services) {
|
||||
companion object {
|
||||
val type = ValidatingNotaryService.type.getSubType("raft")
|
||||
}
|
||||
|
||||
override fun createProtocol(otherParty: Party): ValidatingNotaryProtocol {
|
||||
return ValidatingNotaryProtocol(otherParty, timestampChecker, uniquenessProvider)
|
||||
}
|
||||
}
|
@ -0,0 +1,110 @@
|
||||
package net.corda.node.services
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import io.atomix.catalyst.transport.Address
|
||||
import io.atomix.copycat.client.CopycatClient
|
||||
import io.atomix.copycat.server.CopycatServer
|
||||
import io.atomix.copycat.server.storage.Storage
|
||||
import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.utilities.LogHelper
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.DistributedImmutableMap
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.testing.freeLocalHostAndPort
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class DistributedImmutableMapTests {
|
||||
data class Member(val client: CopycatClient, val server: CopycatServer)
|
||||
|
||||
lateinit var cluster: List<Member>
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var transaction: Transaction
|
||||
lateinit var database: Database
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
LogHelper.setLevel("-org.apache.activemq")
|
||||
LogHelper.setLevel(NetworkMapService::class)
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
cluster = setUpCluster()
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
LogHelper.reset("org.apache.activemq")
|
||||
LogHelper.reset(NetworkMapService::class)
|
||||
cluster.forEach {
|
||||
it.client.close()
|
||||
it.server.shutdown()
|
||||
}
|
||||
dataSource.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `stores entries correctly`() {
|
||||
val client = cluster.last().client
|
||||
|
||||
val entries = mapOf("key1" to "value1", "key2" to "value2")
|
||||
|
||||
val conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).get()
|
||||
assertTrue { conflict.isEmpty() }
|
||||
|
||||
val value1 = client.submit(DistributedImmutableMap.Commands.Get<String, String>("key1"))
|
||||
val value2 = client.submit(DistributedImmutableMap.Commands.Get<String, String>("key2"))
|
||||
|
||||
assertEquals(value1.get(), "value1")
|
||||
assertEquals(value2.get(), "value2")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `returns conflict for duplicate entries`() {
|
||||
val client = cluster.last().client
|
||||
|
||||
val entries = mapOf("key1" to "value1", "key2" to "value2")
|
||||
|
||||
var conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).get()
|
||||
assertTrue { conflict.isEmpty() }
|
||||
conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).get()
|
||||
assertTrue { conflict == entries }
|
||||
}
|
||||
|
||||
private fun setUpCluster(nodeCount: Int = 3): List<Member> {
|
||||
val clusterAddress = freeLocalHostAndPort()
|
||||
val cluster = mutableListOf(createReplica(clusterAddress))
|
||||
for (i in 1..nodeCount) cluster.add(createReplica(freeLocalHostAndPort(), clusterAddress))
|
||||
return cluster.map { it.get() }
|
||||
}
|
||||
|
||||
private fun createReplica(myAddress: HostAndPort, clusterAddress: HostAndPort? = null): CompletableFuture<Member> {
|
||||
val storage = Storage.builder().withStorageLevel(StorageLevel.MEMORY).build()
|
||||
val address = Address(myAddress.hostText, myAddress.port)
|
||||
|
||||
val stateMachineFactory = { DistributedImmutableMap<String, ByteArray>(database, "commited_states_${myAddress.port}") }
|
||||
|
||||
val server = CopycatServer.builder(address)
|
||||
.withStateMachine(stateMachineFactory)
|
||||
.withStorage(storage)
|
||||
.build()
|
||||
|
||||
val serverInitFuture = if (clusterAddress != null) {
|
||||
val cluster = Address(clusterAddress.hostText, clusterAddress.port)
|
||||
server.join(cluster)
|
||||
} else {
|
||||
server.bootstrap()
|
||||
}
|
||||
|
||||
val client = CopycatClient.builder(address).build()
|
||||
return serverInitFuture.thenCompose { client.connect(address) }.thenApply { Member(it, server) }
|
||||
}
|
||||
}
|
@ -0,0 +1,173 @@
|
||||
package net.corda.node.services
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import net.corda.core.contracts.DummyContract
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionType
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.PublicKeyTree
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.crypto.tree
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.random63BitValue
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.LogHelper
|
||||
import net.corda.node.internal.AbstractNode
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.services.config.ConfigHelper
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.utilities.databaseTransaction
|
||||
import net.corda.protocols.NotaryError
|
||||
import net.corda.protocols.NotaryException
|
||||
import net.corda.protocols.NotaryProtocol
|
||||
import net.corda.testing.freeLocalHostAndPort
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.File
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.security.KeyPair
|
||||
import java.util.*
|
||||
import java.util.concurrent.ExecutionException
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
// TODO: clean up and rewrite this using DriverDSL
|
||||
class DistributedNotaryTests {
|
||||
val baseDir = "build/notaryTest/${Date()}"
|
||||
val notaryName = "Notary Service"
|
||||
val clusterSize = 3
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
LogHelper.setLevel("-org.apache.activemq")
|
||||
LogHelper.setLevel(NetworkMapService::class)
|
||||
File(baseDir).mkdirs()
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
LogHelper.reset("org.apache.activemq")
|
||||
LogHelper.reset(NetworkMapService::class)
|
||||
File(baseDir).deleteRecursively()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should detect double spend`() {
|
||||
val masterNode = createNotaryCluster()
|
||||
val alice = createAliceNode(masterNode.net.myAddress)
|
||||
|
||||
val notaryParty = alice.netMapCache.getAnyNotary(RaftValidatingNotaryService.type)!!
|
||||
|
||||
val stx = run {
|
||||
val notaryNodeKeyPair = databaseTransaction(masterNode.database) { masterNode.services.notaryIdentityKey }
|
||||
val inputState = issueState(alice, notaryParty, notaryNodeKeyPair)
|
||||
val tx = TransactionType.General.Builder(notaryParty).withItems(inputState)
|
||||
val aliceKey = databaseTransaction(alice.database) { alice.services.legalIdentityKey }
|
||||
tx.signWith(aliceKey)
|
||||
tx.toSignedTransaction(false)
|
||||
}
|
||||
|
||||
val buildProtocol = { NotaryProtocol.Client(stx) }
|
||||
|
||||
val firstSpend = alice.services.startProtocol(buildProtocol())
|
||||
firstSpend.resultFuture.get()
|
||||
|
||||
val secondSpend = alice.services.startProtocol(buildProtocol())
|
||||
|
||||
val ex = assertFailsWith(ExecutionException::class) { secondSpend.resultFuture.get() }
|
||||
val error = (ex.cause as NotaryException).error as NotaryError.Conflict
|
||||
assertEquals(error.tx, stx.tx)
|
||||
}
|
||||
|
||||
private fun createNotaryCluster(): Node {
|
||||
val notaryClusterAddress = freeLocalHostAndPort()
|
||||
val keyPairs = (1..clusterSize).map { generateKeyPair() }
|
||||
|
||||
val notaryKeyTree = PublicKeyTree.Builder().addKeys(keyPairs.map { it.public.tree }).build(1)
|
||||
val notaryParty = Party(notaryName, notaryKeyTree).serialize()
|
||||
|
||||
var networkMapAddress: SingleMessageRecipient? = null
|
||||
|
||||
val cluster = keyPairs.mapIndexed { i, keyPair ->
|
||||
val dir = Paths.get(baseDir, "notaryNode$i")
|
||||
Files.createDirectories(dir)
|
||||
|
||||
val privateKeyFile = RaftValidatingNotaryService.type.id + "-private-key"
|
||||
val publicKeyFile = RaftValidatingNotaryService.type.id + "-public"
|
||||
|
||||
notaryParty.writeToFile(dir.resolve(publicKeyFile))
|
||||
keyPair.serialize().writeToFile(dir.resolve(privateKeyFile))
|
||||
|
||||
val node: Node
|
||||
if (networkMapAddress == null) {
|
||||
val config = generateConfig(dir, "node" + random63BitValue(), notaryClusterAddress)
|
||||
node = createNotaryNode(config)
|
||||
networkMapAddress = node.net.myAddress
|
||||
} else {
|
||||
val config = generateConfig(dir, "node" + random63BitValue(), freeLocalHostAndPort(), notaryClusterAddress)
|
||||
node = createNotaryNode(config, networkMapAddress)
|
||||
}
|
||||
|
||||
node
|
||||
}
|
||||
|
||||
return cluster.first()
|
||||
}
|
||||
|
||||
private fun createNotaryNode(config: FullNodeConfiguration, networkMapAddress: SingleMessageRecipient? = null): Node {
|
||||
val extraAdvertisedServices = if (networkMapAddress == null) setOf(ServiceInfo(NetworkMapService.type, "NMS")) else emptySet<ServiceInfo>()
|
||||
|
||||
val notaryNode = Node(
|
||||
configuration = config,
|
||||
advertisedServices = extraAdvertisedServices + ServiceInfo(RaftValidatingNotaryService.type, notaryName),
|
||||
networkMapAddress = networkMapAddress)
|
||||
|
||||
notaryNode.setup().start()
|
||||
thread { notaryNode.run() }
|
||||
notaryNode.networkMapRegistrationFuture.get()
|
||||
return notaryNode
|
||||
}
|
||||
|
||||
private fun createAliceNode(networkMapAddress: SingleMessageRecipient): Node {
|
||||
val aliceDir = Paths.get(baseDir, "alice")
|
||||
val alice = Node(
|
||||
configuration = generateConfig(aliceDir, "Alice"),
|
||||
advertisedServices = setOf(),
|
||||
networkMapAddress = networkMapAddress)
|
||||
alice.setup().start()
|
||||
thread { alice.run() }
|
||||
alice.networkMapRegistrationFuture.get()
|
||||
return alice
|
||||
}
|
||||
|
||||
private fun issueState(node: AbstractNode, notary: Party, notaryKey: KeyPair): StateAndRef<*> {
|
||||
return databaseTransaction(node.database) {
|
||||
val tx = DummyContract.generateInitial(node.info.legalIdentity.ref(0), Random().nextInt(), notary)
|
||||
tx.signWith(node.services.legalIdentityKey)
|
||||
tx.signWith(notaryKey)
|
||||
val stx = tx.toSignedTransaction()
|
||||
node.services.recordTransactions(listOf(stx))
|
||||
StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0))
|
||||
}
|
||||
}
|
||||
|
||||
private fun generateConfig(dir: Path, name: String, notaryNodeAddress: HostAndPort? = null, notaryClusterAddress: HostAndPort? = null) = FullNodeConfiguration(
|
||||
ConfigHelper.loadConfig(dir,
|
||||
allowMissingConfig = true,
|
||||
configOverrides = mapOf(
|
||||
"myLegalName" to name,
|
||||
"basedir" to dir,
|
||||
"artemisAddress" to freeLocalHostAndPort().toString(),
|
||||
"webAddress" to freeLocalHostAndPort().toString(),
|
||||
"notaryNodeAddress" to notaryNodeAddress?.toString(),
|
||||
"notaryClusterAddresses" to (if (notaryClusterAddress == null) emptyList<String>() else listOf(notaryClusterAddress.toString()))
|
||||
)))
|
||||
}
|
Reference in New Issue
Block a user