ENT-2822: Move experimental raft and bft-smart notaries back into node, fix reference state support (#4509)

Move Raft and BFT-Smart notaries back into node to preserve backwards compatibility.

* Allow overriding full node config when using internal mock network parameters.

* Make BFT-Smart notary start up in prod mode as well

* Move raft & bftsmart notaries to net.corda.notary.experimental package

* Make sure Raft notary handles reference state edge cases correctly.

* Make sure BFT-Smart notary handles reference state edge cases correctly.

* Include notary schemas in node internal schemas

* Undo Raft notary table schema changes to maintain compatibility.
This commit is contained in:
Andrius Dagys 2019-01-09 15:52:42 +00:00 committed by GitHub
parent 4530a5e982
commit fa025dedeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 732 additions and 421 deletions

View File

@ -383,8 +383,6 @@ bintrayConfig {
'corda-tools-explorer',
'corda-tools-network-bootstrapper',
'corda-tools-cliutils',
'corda-notary-raft',
'corda-notary-bft-smart',
'corda-common-configuration-parsing',
'corda-common-validation'
]

View File

@ -1,5 +1,7 @@
package net.corda.core.node
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.crypto.generateKeyPair
import net.corda.core.internal.getPackageOwnerOf
import net.corda.core.utilities.OpaqueBytes
@ -7,16 +9,15 @@ import net.corda.core.utilities.days
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueFlow
import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.MockNetNotaryConfig
import net.corda.testing.node.MockNetworkNotarySpec
import net.corda.testing.node.MockNetworkParameters
import net.corda.testing.node.MockNodeConfigOverrides
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.MOCK_VERSION_INFO
@ -66,8 +67,14 @@ class NetworkParametersTest {
// Notaries tests
@Test
fun `choosing notary not specified in network parameters will fail`() {
val fakeNotary = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME,
configOverrides = MockNodeConfigOverrides(notary = MockNetNotaryConfig(validating = false))))
val fakeNotary = mockNet.createNode(
InternalMockNodeParameters(
legalName = BOB_NAME,
configOverrides = {
doReturn(NotaryConfig(validating = false)).whenever(it).notary
}
)
)
val fakeNotaryId = fakeNotary.info.singleIdentity()
val alice = mockNet.createPartyNode(ALICE_NAME)
assertThat(alice.services.networkMapCache.notaryIdentities).doesNotContain(fakeNotaryId)

View File

@ -86,6 +86,9 @@ Version 4.0
* Introduced new optional network bootstrapper command line options (--register-package-owner, --unregister-package-owner)
to register/unregister a java package namespace with an associated owner in the network parameter packageOwnership whitelist.
* BFT-Smart and Raft notary implementations have been move to the ``net.corda.notary.experimental`` package to emphasise
their experimental nature.
* New "validate-configuration" sub-command to `corda.jar`, allowing to validate the actual node configuration without starting the node.
* CorDapps now have the ability to specify a minimum platform version in their MANIFEST.MF to prevent old nodes from loading them.
@ -104,34 +107,8 @@ Version 4.0
* Introduced new optional network bootstrapper command line option (--minimum-platform-version) to set as a network parameter
* BFT-Smart and Raft notary implementations have been extracted out of node into ``experimental`` CorDapps to emphasise
their experimental nature. Moreover, the BFT-Smart notary will only work in dev mode due to its use of Java serialization.
* Vault storage of contract state constraints metadata and associated vault query functions to retrieve and sort by constraint type.
* UPGRADE REQUIRED: changes have been made to how notary implementations are configured and loaded.
No upgrade steps are required for the single-node notary (both validating and non-validating variants).
Other notary implementations have been moved out of the Corda node into individual Cordapps, and require configuration
file updates.
To run a notary you will now need to include the appropriate notary CorDapp in the ``cordapps/`` directory:
* ``corda-notary-raft`` for the Raft notary.
* ``corda-notary-bft-smart`` for the BFT-Smart notary.
It is now required to specify the fully qualified notary service class name, ``className``, and the legal name of
the notary service in case of distributed notaries: ``serviceLegalName``.
Implementation-specific configuration values have been moved to the ``extraConfig`` configuration block.
Example configuration changes for the Raft notary:
.. image:: resources/notary-config-update.png
Example configuration changes for the BFT-Smart notary:
.. image:: resources/notary-config-update-bft.png
* New overload for ``CordaRPCClient.start()`` method allowing to specify target legal identity to use for RPC call.
* Case insensitive vault queries can be specified via a boolean on applicable SQL criteria builder operators. By default

View File

@ -110,8 +110,6 @@ Here is an overview of the various Corda dependencies:
frameworks
* ``corda-node-api`` - The node API. Required to bootstrap a local network
* ``corda-node-driver`` - Testing utility for programmatically starting nodes from JVM languages. Use for tests
* ``corda-notary-bft-smart`` - A Corda notary implementation
* ``corda-notary-raft`` - A Corda notary implementation
* ``corda-rpc`` - The Corda RPC client library. Used when writing an RPC client
* ``corda-serialization`` - The Corda core serialization library. Automatically included by other dependencies
* ``corda-serialization-deterministic`` - The Corda core serialization library. Automatically included by other

View File

@ -1,47 +0,0 @@
apply plugin: 'kotlin'
apply plugin: 'kotlin-jpa'
apply plugin: 'idea'
apply plugin: 'net.corda.plugins.cordapp'
apply plugin: 'net.corda.plugins.publish-utils'
apply plugin: 'com.jfrog.artifactory'
configurations {
integrationTestCompile.extendsFrom testCompile
integrationTestRuntime.extendsFrom testRuntime
}
dependencies {
cordaCompile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
// Corda integration dependencies
cordaCompile project(':node')
// BFT-SMaRt
compile 'commons-codec:commons-codec:1.10'
compile 'com.github.bft-smart:library:master-v1.1-beta-g6215ec8-87'
testCompile "junit:junit:$junit_version"
testCompile project(':node-driver')
}
idea {
module {
downloadJavadoc = true // defaults to false
downloadSources = true
}
}
publish {
name 'corda-notary-bft-smart'
}
cordapp {
targetPlatformVersion corda_platform_version.toInteger()
minimumPlatformVersion 1
workflow {
name "net/corda/experimental/notary-bft-smart"
versionId 1
vendor "R3"
licence "Open Source (Apache 2)"
}
}

View File

@ -1,47 +0,0 @@
apply plugin: 'kotlin'
apply plugin: 'idea'
apply plugin: 'net.corda.plugins.cordapp'
apply plugin: 'net.corda.plugins.publish-utils'
apply plugin: 'com.jfrog.artifactory'
configurations {
integrationTestCompile.extendsFrom testCompile
integrationTestRuntime.extendsFrom testRuntime
}
dependencies {
cordaCompile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
// Corda integration dependencies
cordaCompile project(':node')
// Java Atomix: RAFT library
compile 'io.atomix.copycat:copycat-client:1.2.8'
compile 'io.atomix.copycat:copycat-server:1.2.8'
compile 'io.atomix.catalyst:catalyst-netty:1.2.1'
testCompile "junit:junit:$junit_version"
testCompile project(':node-driver')
}
idea {
module {
downloadJavadoc = true // defaults to false
downloadSources = true
}
}
publish {
name 'corda-notary-raft'
}
cordapp {
targetPlatformVersion corda_platform_version.toInteger()
minimumPlatformVersion 1
workflow {
name "net/corda/experimental/notary-raft"
versionId 1
vendor "R3"
licence "Open Source (Apache 2)"
}
}

View File

@ -163,7 +163,7 @@ internal constructor(private val initSerEnv: Boolean,
private fun isBFTNotary(config: Config): Boolean {
// TODO: pass a commandline parameter to the bootstrapper instead. Better yet, a notary config map
// specifying the notary identities and the type (single-node, CFT, BFT) of each notary to set up.
return config.getString("notary.className").contains("BFT", true)
return config.hasPath("notary.bftSMaRt")
}
private fun generateServiceIdentitiesForNotaryClusters(configs: Map<Path, Config>) {

View File

@ -158,6 +158,10 @@ class SchemaMigration(
if (schemas.any { schema -> schema.migrationResource == "node-notary.changelog-master" })
preV4Baseline.addAll(listOf("migration/node-notary.changelog-init.xml",
"migration/node-notary.changelog-v1.xml"))
if (schemas.any { schema -> schema.migrationResource == "notary-raft.changelog-master" })
preV4Baseline.addAll(listOf("migration/notary-raft.changelog-init.xml",
"migration/notary-raft.changelog-v1.xml"))
}
if (isFinanceAppWithLiquibaseNotMigrated) {
preV4Baseline.addAll(listOf("migration/cash.changelog-init.xml",

View File

@ -168,6 +168,15 @@ dependencies {
// AgentLoader: dynamic loading of JVM agents
compile group: 'com.ea.agentloader', name: 'ea-agent-loader', version: "${eaagentloader_version}"
// BFT-Smart dependencies
compile 'commons-codec:commons-codec:1.10'
compile 'com.github.bft-smart:library:master-v1.1-beta-g6215ec8-87'
// Java Atomix: RAFT library
compile 'io.atomix.copycat:copycat-client:1.2.3'
compile 'io.atomix.copycat:copycat-server:1.2.3'
compile 'io.atomix.catalyst:catalyst-netty:1.1.2'
// Jetty dependencies for NetworkMapClient test.
// Web stuff: for HTTP[S] servlets
testCompile "org.eclipse.jetty:jetty-servlet:${jetty_version}"

View File

@ -33,7 +33,6 @@ import net.corda.core.utilities.days
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.node.CordaClock
import net.corda.node.SerialFilter
import net.corda.node.VersionInfo
import net.corda.node.cordapp.CordappLoader
import net.corda.node.internal.classloading.requireAnnotation
@ -63,7 +62,6 @@ import net.corda.node.services.network.NetworkMapUpdater
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.*
import net.corda.node.services.persistence.AttachmentStorageInternal
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.*
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
@ -94,7 +92,6 @@ import java.lang.reflect.InvocationTargetException
import java.nio.file.Paths
import java.security.KeyPair
import java.security.KeyStoreException
import java.security.PublicKey
import java.security.cert.X509Certificate
import java.sql.Connection
import java.time.Clock
@ -146,6 +143,9 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
}
private val notaryLoader = configuration.notary?.let {
NotaryLoader(it, versionInfo)
}
val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo).closeOnStop()
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize()
val identityService = PersistentIdentityService(cacheFactory).tokenize()
@ -374,8 +374,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
// the identity key. But the infrastructure to make that easy isn't here yet.
keyManagementService.start(keyPairs)
val notaryService = makeNotaryService(myNotaryIdentity)
installCordaServices(myNotaryIdentity)
val notaryService = maybeStartNotaryService(myNotaryIdentity)
installCordaServices()
contractUpgradeService.start()
vaultService.start()
ScheduledActivityObserver.install(vaultService, schedulerService, flowLogicRefFactory)
@ -538,12 +538,11 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
private fun makeCordappLoader(configuration: NodeConfiguration, versionInfo: VersionInfo): CordappLoader {
val generatedCordapps = mutableListOf(VirtualCordapp.generateCoreCordapp(versionInfo))
if (isRunningSimpleNotaryService(configuration)) {
// For backwards compatibility purposes the single node notary implementation is built-in: a virtual
// CorDapp will be generated.
generatedCordapps += VirtualCordapp.generateSimpleNotaryCordapp(versionInfo)
val generatedCordapps = mutableListOf(VirtualCordapp.generateCore(versionInfo))
notaryLoader?.builtInNotary?.let { notaryImpl ->
generatedCordapps += notaryImpl
}
val blacklistedKeys = if (configuration.devMode) emptyList()
else configuration.cordappSignerKeyFingerprintBlacklist.map {
try {
@ -567,7 +566,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
private class ServiceInstantiationException(cause: Throwable?) : CordaException("Service Instantiation Error", cause)
private fun installCordaServices(myNotaryIdentity: PartyAndCertificate?) {
private fun installCordaServices() {
val loadedServices = cordappLoader.cordapps.flatMap { it.services }
loadedServices.forEach {
try {
@ -782,20 +781,10 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
logVendorString(database, log)
}
private fun makeNotaryService(myNotaryIdentity: PartyAndCertificate?): NotaryService? {
return configuration.notary?.let { notaryConfig ->
val serviceClass = getNotaryServiceClass(notaryConfig.className)
log.info("Starting notary service: $serviceClass")
val notaryKey = myNotaryIdentity?.owningKey
?: throw IllegalArgumentException("Unable to start notary service $serviceClass: notary identity not found")
/** Some notary implementations only work with Java serialization. */
maybeInstallSerializationFilter(serviceClass)
val constructor = serviceClass.getDeclaredConstructor(ServiceHubInternal::class.java, PublicKey::class.java)
.apply { isAccessible = true }
val service = constructor.newInstance(services, notaryKey) as NotaryService
/** Loads and starts a notary service if it is configured. */
private fun maybeStartNotaryService(myNotaryIdentity: PartyAndCertificate?): NotaryService? {
return notaryLoader?.let { loader ->
val service = loader.loadService(myNotaryIdentity, services, cordappLoader)
service.run {
tokenize()
@ -807,30 +796,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
}
/** Installs a custom serialization filter defined by a notary service implementation. Only supported in dev mode. */
private fun maybeInstallSerializationFilter(serviceClass: Class<out NotaryService>) {
try {
@Suppress("UNCHECKED_CAST")
val filter = serviceClass.getDeclaredMethod("getSerializationFilter").invoke(null) as ((Class<*>) -> Boolean)
if (configuration.devMode) {
log.warn("Installing a custom Java serialization filter, required by ${serviceClass.name}. " +
"Note this is only supported in dev mode a production node will fail to start if serialization filters are used.")
SerialFilter.install(filter)
} else {
throw UnsupportedOperationException("Unable to install a custom Java serialization filter, not in dev mode.")
}
} catch (e: NoSuchMethodException) {
// No custom serialization filter declared
}
}
private fun getNotaryServiceClass(className: String): Class<out NotaryService> {
val loadedImplementations = cordappLoader.cordapps.mapNotNull { it.notaryService }
log.debug("Notary service implementations found: ${loadedImplementations.joinToString(", ")}")
return loadedImplementations.firstOrNull { it.name == className }
?: throw IllegalArgumentException("The notary service implementation specified in the configuration: $className is not found. Available implementations: ${loadedImplementations.joinToString(", ")}}")
}
protected open fun makeKeyManagementService(identityService: PersistentIdentityService): KeyManagementServiceInternal {
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with

View File

@ -8,6 +8,10 @@ import net.corda.core.internal.location
import net.corda.node.VersionInfo
import net.corda.node.services.transactions.NodeNotarySchemaV1
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.notary.experimental.bftsmart.BFTSmartNotarySchemaV1
import net.corda.notary.experimental.bftsmart.BFTSmartNotaryService
import net.corda.notary.experimental.raft.RaftNotarySchemaV1
import net.corda.notary.experimental.raft.RaftNotaryService
internal object VirtualCordapp {
/** A list of the core RPC flows present in Corda */
@ -18,7 +22,7 @@ internal object VirtualCordapp {
)
/** A Cordapp representing the core package which is not scanned automatically. */
fun generateCoreCordapp(versionInfo: VersionInfo): CordappImpl {
fun generateCore(versionInfo: VersionInfo): CordappImpl {
return CordappImpl(
contractClassNames = listOf(),
initiatedFlows = listOf(),
@ -33,7 +37,7 @@ internal object VirtualCordapp {
allFlows = listOf(),
jarPath = ContractUpgradeFlow.javaClass.location, // Core JAR location
jarHash = SecureHash.allOnesHash,
minimumPlatformVersion = 1,
minimumPlatformVersion = versionInfo.platformVersion,
targetPlatformVersion = versionInfo.platformVersion,
notaryService = null,
isLoaded = false
@ -41,7 +45,7 @@ internal object VirtualCordapp {
}
/** A Cordapp for the built-in notary service implementation. */
fun generateSimpleNotaryCordapp(versionInfo: VersionInfo): CordappImpl {
fun generateSimpleNotary(versionInfo: VersionInfo): CordappImpl {
return CordappImpl(
contractClassNames = listOf(),
initiatedFlows = listOf(),
@ -56,10 +60,56 @@ internal object VirtualCordapp {
allFlows = listOf(),
jarPath = SimpleNotaryService::class.java.location,
jarHash = SecureHash.allOnesHash,
minimumPlatformVersion = 1,
minimumPlatformVersion = versionInfo.platformVersion,
targetPlatformVersion = versionInfo.platformVersion,
notaryService = SimpleNotaryService::class.java,
isLoaded = false
)
}
/** A Cordapp for the built-in Raft notary service implementation. */
fun generateRaftNotary(versionInfo: VersionInfo): CordappImpl {
return CordappImpl(
contractClassNames = listOf(),
initiatedFlows = listOf(),
rpcFlows = listOf(),
serviceFlows = listOf(),
schedulableFlows = listOf(),
services = listOf(),
serializationWhitelists = listOf(),
serializationCustomSerializers = listOf(),
customSchemas = setOf(RaftNotarySchemaV1),
info = Cordapp.Info.Default("corda-notary-raft", versionInfo.vendor, versionInfo.releaseVersion, "Open Source (Apache 2)"),
allFlows = listOf(),
jarPath = RaftNotaryService::class.java.location,
jarHash = SecureHash.allOnesHash,
minimumPlatformVersion = versionInfo.platformVersion,
targetPlatformVersion = versionInfo.platformVersion,
notaryService = RaftNotaryService::class.java,
isLoaded = false
)
}
/** A Cordapp for the built-in BFT-Smart notary service implementation. */
fun generateBFTSmartNotary(versionInfo: VersionInfo): CordappImpl {
return CordappImpl(
contractClassNames = listOf(),
initiatedFlows = listOf(),
rpcFlows = listOf(),
serviceFlows = listOf(),
schedulableFlows = listOf(),
services = listOf(),
serializationWhitelists = listOf(),
serializationCustomSerializers = listOf(),
customSchemas = setOf(BFTSmartNotarySchemaV1),
info = Cordapp.Info.Default("corda-notary-bft-smart", versionInfo.vendor, versionInfo.releaseVersion, "Open Source (Apache 2)"),
allFlows = listOf(),
jarPath = BFTSmartNotaryService::class.java.location,
jarHash = SecureHash.allOnesHash,
minimumPlatformVersion = versionInfo.platformVersion,
targetPlatformVersion = versionInfo.platformVersion,
notaryService = BFTSmartNotaryService::class.java,
isLoaded = false
)
}
}

View File

@ -16,6 +16,8 @@ import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.cryptoservice.CryptoService
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.notary.experimental.bftsmart.BFTSmartConfig
import net.corda.notary.experimental.raft.RaftConfig
import net.corda.tools.shell.SSHDConfiguration
import java.net.URL
import java.nio.file.Path
@ -141,7 +143,7 @@ data class NotaryConfig(
/** The legal name of cluster in case of a distributed notary service. */
val serviceLegalName: CordaX500Name? = null,
/** The name of the notary service class to load. */
val className: String = "net.corda.node.services.transactions.SimpleNotaryService",
val className: String? = null,
/**
* If the wait time estimate on the internal queue exceeds this value, the notary may send
* a wait time update to the client (implementation specific and dependent on the counter
@ -149,7 +151,9 @@ data class NotaryConfig(
*/
val etaMessageThresholdSeconds: Int = NotaryServiceFlow.defaultEstimatedWaitTime.seconds.toInt(),
/** Notary implementation-specific configuration parameters. */
val extraConfig: Config? = null
val extraConfig: Config? = null,
val raft: RaftConfig? = null,
val bftSMaRt: BFTSmartConfig? = null
)
/**

View File

@ -43,6 +43,8 @@ import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.TransactionIsolationLevel
import net.corda.nodeapi.internal.persistence.SchemaInitializationType
import net.corda.notary.experimental.bftsmart.BFTSmartConfig
import net.corda.notary.experimental.raft.RaftConfig
import net.corda.tools.shell.SSHDConfiguration
internal object UserSpec : Configuration.Specification<User>("User") {
@ -165,15 +167,38 @@ internal object FlowTimeoutConfigurationSpec : Configuration.Specification<FlowT
internal object NotaryConfigSpec : Configuration.Specification<NotaryConfig>("NotaryConfig") {
private val validating by boolean()
private val serviceLegalName by string().mapValid(::toCordaX500Name).optional()
private val className by string().optional().withDefaultValue("net.corda.node.services.transactions.SimpleNotaryService")
private val className by string().optional()
private val etaMessageThresholdSeconds by int().optional().withDefaultValue(NotaryServiceFlow.defaultEstimatedWaitTime.seconds.toInt())
private val extraConfig by nestedObject().map(ConfigObject::toConfig).optional()
private val raft by nested(RaftConfigSpec).optional()
private val bftSMaRt by nested(BFTSmartConfigSpec).optional()
override fun parseValid(configuration: Config): Valid<NotaryConfig> {
return valid(NotaryConfig(configuration[validating], configuration[serviceLegalName], configuration[className], configuration[etaMessageThresholdSeconds], configuration[extraConfig]))
return valid(NotaryConfig(configuration[validating], configuration[serviceLegalName], configuration[className], configuration[etaMessageThresholdSeconds], configuration[extraConfig], configuration[raft], configuration[bftSMaRt]))
}
}
internal object RaftConfigSpec : Configuration.Specification<RaftConfig>("RaftConfig") {
private val nodeAddress by string().mapValid(::toNetworkHostAndPort)
private val clusterAddresses by string().mapValid(::toNetworkHostAndPort).listOrEmpty()
override fun parseValid(configuration: Config): Valid<RaftConfig> {
return valid(RaftConfig(configuration[nodeAddress], configuration[clusterAddresses]))
}
}
internal object BFTSmartConfigSpec : Configuration.Specification<BFTSmartConfig>("BFTSmartConfig") {
private val replicaId by int()
private val clusterAddresses by string().mapValid(::toNetworkHostAndPort).listOrEmpty()
private val debug by boolean().optional().withDefaultValue(false)
private val exposeRaces by boolean().optional().withDefaultValue(false)
override fun parseValid(configuration: Config): Valid<BFTSmartConfig> {
return valid(BFTSmartConfig(configuration[replicaId], configuration[clusterAddresses], configuration[debug], configuration[exposeRaces]))
}
}
internal object NodeRpcSettingsSpec : Configuration.Specification<NodeRpcSettings>("NodeRpcSettings") {
internal object BrokerRpcSslOptionsSpec : Configuration.Specification<BrokerRpcSslOptions>("BrokerRpcSslOptions") {
private val keyStorePath by string().mapValid(::toPath)

View File

@ -60,7 +60,8 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet()
fun internalSchemas() = requiredSchemas.keys + extraSchemas.filter { schema -> // when mapped schemas from the finance module are present, they are considered as internal ones
schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" ||
schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" ||
schema::class.qualifiedName == "net.corda.node.services.transactions.NodeNotarySchemaV1"
schema::class.qualifiedName == "net.corda.node.services.transactions.NodeNotarySchemaV1" ||
schema::class.qualifiedName?.startsWith("net.corda.notary.") ?: false
}
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas + extraSchemas.associateBy({ it }, { SchemaOptions() })

View File

@ -0,0 +1,93 @@
package net.corda.node.utilities
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.cordapp.CordappImpl
import net.corda.core.internal.notary.NotaryService
import net.corda.core.utilities.contextLogger
import net.corda.node.SerialFilter
import net.corda.node.VersionInfo
import net.corda.node.cordapp.CordappLoader
import net.corda.node.internal.cordapp.VirtualCordapp
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.notary.experimental.bftsmart.BFTSmartNotaryService
import net.corda.notary.experimental.raft.RaftNotaryService
import java.security.PublicKey
class NotaryLoader(
private val config: NotaryConfig,
versionInfo: VersionInfo
) {
companion object {
private val log = contextLogger()
}
/**
* A virtual CorDapp containing the notary implementation if one of the built-in notaries is used.
* [Null] if a notary implementation is expected to be loaded from an external CorDapp.
*/
val builtInNotary: CordappImpl?
private val builtInServiceClass: Class<out NotaryService>?
init {
builtInServiceClass = if (config.className.isNullOrBlank()) {
// Using a built-in notary
when {
config.bftSMaRt != null -> {
builtInNotary = VirtualCordapp.generateBFTSmartNotary(versionInfo)
BFTSmartNotaryService::class.java
}
config.raft != null -> {
builtInNotary = VirtualCordapp.generateRaftNotary(versionInfo)
RaftNotaryService::class.java
}
else -> {
builtInNotary = VirtualCordapp.generateSimpleNotary(versionInfo)
SimpleNotaryService::class.java
}
}
} else {
// Using a notary from an external CorDapp
builtInNotary = null
null
}
}
fun loadService(myNotaryIdentity: PartyAndCertificate?, services: ServiceHubInternal, cordappLoader: CordappLoader): NotaryService {
val serviceClass = builtInServiceClass ?: scanCorDapps(cordappLoader)
log.info("Starting notary service: $serviceClass")
val notaryKey = myNotaryIdentity?.owningKey
?: throw IllegalArgumentException("Unable to start notary service $serviceClass: notary identity not found")
/** Some notary implementations only work with Java serialization. */
maybeInstallSerializationFilter(serviceClass)
val constructor = serviceClass
.getDeclaredConstructor(ServiceHubInternal::class.java, PublicKey::class.java)
.apply { isAccessible = true }
return constructor.newInstance(services, notaryKey)
}
/** Looks for the config specified notary service implementation in loaded CorDapps. This mechanism is for internal use only. */
private fun scanCorDapps(cordappLoader: CordappLoader): Class<out NotaryService> {
val loadedImplementations = cordappLoader.cordapps.mapNotNull { it.notaryService }
log.debug("Notary service implementations found: ${loadedImplementations.joinToString(", ")}")
return loadedImplementations.firstOrNull { it.name == config.className }
?: throw IllegalArgumentException("The notary service implementation specified in the configuration: ${config.className} is not found. Available implementations: ${loadedImplementations.joinToString(", ")}}")
}
/** Installs a custom serialization filter defined by a notary service implementation. Only supported in dev mode. */
private fun maybeInstallSerializationFilter(serviceClass: Class<out NotaryService>) {
try {
@Suppress("UNCHECKED_CAST")
val filter = serviceClass
.getDeclaredMethod("getSerializationFilter")
.invoke(null) as ((Class<*>) -> Boolean)
SerialFilter.install(filter)
} catch (e: NoSuchMethodException) {
// No custom serialization filter declared
}
}
}

View File

@ -1,4 +1,4 @@
package net.corda.notary.bftsmart
package net.corda.notary.experimental.bftsmart
import bftsmart.communication.ServerCommunicationSystem
import bftsmart.communication.client.netty.NettyClientServerCommunicationSystemClientSide
@ -37,8 +37,8 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.notary.bftsmart.BFTSMaRt.Client
import net.corda.notary.bftsmart.BFTSMaRt.Replica
import net.corda.notary.experimental.bftsmart.BFTSmart.Client
import net.corda.notary.experimental.bftsmart.BFTSmart.Replica
import java.nio.file.Path
import java.security.PublicKey
import java.util.*
@ -55,7 +55,7 @@ import java.util.*
// perhaps a design doc. In general, it seems possible to use the state machine to reconfigure the cluster (reaching
// consensus about membership changes). Nodes that join the cluster for the first time or re-join can go through
// a "recovering" state and request missing data from their peers.
object BFTSMaRt {
object BFTSmart {
/** Sent from [Client] to [Replica]. */
@CordaSerializable
data class CommitRequest(val payload: NotarisationPayload, val callerIdentity: Party)
@ -79,7 +79,7 @@ object BFTSMaRt {
fun waitUntilAllReplicasHaveInitialized()
}
class Client(config: BFTSMaRtConfig, private val clientId: Int, private val cluster: Cluster, private val notaryService: BftSmartNotaryService) : SingletonSerializeAsToken() {
class Client(config: BFTSmartConfigInternal, private val clientId: Int, private val cluster: Cluster, private val notaryService: BFTSmartNotaryService) : SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
}
@ -176,10 +176,10 @@ object BFTSMaRt {
*
* The validation logic can be specified by implementing the [executeCommand] method.
*/
abstract class Replica(config: BFTSMaRtConfig,
abstract class Replica(config: BFTSmartConfigInternal,
replicaId: Int,
createMap: () -> AppendOnlyPersistentMap<StateRef, SecureHash,
BftSmartNotaryService.CommittedState, PersistentStateRef>,
BFTSmartNotaryService.CommittedState, PersistentStateRef>,
protected val services: ServiceHubInternal,
protected val notaryIdentityKey: PublicKey) : DefaultRecoverable() {
companion object {
@ -203,7 +203,7 @@ object BFTSMaRt {
private val replica = run {
config.waitUntilReplicaWillNotPrintStackTrace(replicaId)
@Suppress("LeakingThis")
CordaServiceReplica(replicaId, config.path, this)
(CordaServiceReplica(replicaId, config.path, this))
}
fun dispose() {
@ -251,22 +251,61 @@ object BFTSMaRt {
checkConflict(conflictingStates, references, StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
if (conflictingStates.isNotEmpty()) {
if (!isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
log.debug { "Failure, input states or references already committed: ${conflictingStates.keys}" }
throw NotaryInternalException(NotaryError.Conflict(txId, conflictingStates))
if (states.isEmpty()) {
handleReferenceConflicts(txId, conflictingStates)
} else {
handleConflicts(txId, conflictingStates)
}
} else {
val outsideTimeWindowError = validateTimeWindow(services.clock.instant(), timeWindow)
if (outsideTimeWindowError == null) {
states.forEach { commitLog[it] = txId }
log.debug { "Successfully committed all input states: $states" }
} else {
throw NotaryInternalException(outsideTimeWindowError)
}
handleNoConflicts(timeWindow, states, txId)
}
}
}
private fun previouslyCommitted(txId: SecureHash): Boolean {
val session = currentDBSession()
return session.find(BFTSmartNotaryService.CommittedTransaction::class.java, txId.toString()) != null
}
private fun handleReferenceConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>) {
if (!previouslyCommitted(txId)) {
val conflictError = NotaryError.Conflict(txId, conflictingStates)
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
throw NotaryInternalException(conflictError)
}
log.debug { "Transaction $txId already notarised" }
}
private fun handleConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>) {
if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
log.debug { "Transaction $txId already notarised" }
return
} else {
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
val conflictError = NotaryError.Conflict(txId, conflictingStates)
throw NotaryInternalException(conflictError)
}
}
private fun handleNoConflicts(timeWindow: TimeWindow?, states: List<StateRef>, txId: SecureHash) {
// Skip if this is a re-notarisation of a reference-only transaction
if (states.isEmpty() && previouslyCommitted(txId)) {
return
}
val outsideTimeWindowError = validateTimeWindow(services.clock.instant(), timeWindow)
if (outsideTimeWindowError == null) {
states.forEach { stateRef ->
commitLog[stateRef] = txId
}
val session = currentDBSession()
session.persist(BFTSmartNotaryService.CommittedTransaction(txId.toString()))
log.debug { "Successfully committed all input states: $states" }
} else {
throw NotaryInternalException(outsideTimeWindowError)
}
}
private fun logRequest(txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) {
val request = PersistentUniquenessProvider.Request(
consumingTxHash = txId.toString(),

View File

@ -1,4 +1,4 @@
package net.corda.notary.bftsmart
package net.corda.notary.experimental.bftsmart
import net.corda.core.internal.div
import net.corda.core.internal.writer
@ -13,7 +13,7 @@ import java.net.SocketException
import java.nio.file.Files
import java.util.concurrent.TimeUnit.MILLISECONDS
data class BFTSMaRtConfiguration(
data class BFTSmartConfig(
/** The zero-based index of the current replica. All replicas must specify a unique replica id. */
val replicaId: Int,
/**
@ -32,11 +32,11 @@ data class BFTSMaRtConfiguration(
}
/**
* BFT SMaRt can only be configured via files in a configHome directory.
* BFT Smart can only be configured via files in a configHome directory.
* Each instance of this class creates such a configHome, accessible via [path].
* The files are deleted on [close] typically via [use], see [PathManager] for details.
*/
class BFTSMaRtConfig(private val replicaAddresses: List<NetworkHostAndPort>, debug: Boolean, val exposeRaces: Boolean) : PathManager<BFTSMaRtConfig>(Files.createTempDirectory("bft-smart-config")) {
class BFTSmartConfigInternal(private val replicaAddresses: List<NetworkHostAndPort>, debug: Boolean, val exposeRaces: Boolean) : PathManager<BFTSmartConfigInternal>(Files.createTempDirectory("bft-smart-config")) {
companion object {
private val log = contextLogger()
internal const val portIsClaimedFormat = "Port %s is claimed by another replica: %s"
@ -81,7 +81,7 @@ class BFTSMaRtConfig(private val replicaAddresses: List<NetworkHostAndPort>, deb
val peerId = contextReplicaId - 1
if (peerId < 0) return
// The printStackTrace we want to avoid is in replica-replica communication code:
val address = BFTSMaRtPort.FOR_REPLICAS.ofReplica(replicaAddresses[peerId])
val address = BFTSmartPort.FOR_REPLICAS.ofReplica(replicaAddresses[peerId])
log.debug { "Waiting for replica $peerId to start listening on: $address" }
while (!address.isListening()) MILLISECONDS.sleep(200)
log.debug { "Replica $peerId is ready for P2P." }
@ -89,11 +89,11 @@ class BFTSMaRtConfig(private val replicaAddresses: List<NetworkHostAndPort>, deb
private fun replicaPorts(replicaId: Int): List<NetworkHostAndPort> {
val base = replicaAddresses[replicaId]
return BFTSMaRtPort.values().map { it.ofReplica(base) }
return BFTSmartPort.values().map { it.ofReplica(base) }
}
}
private enum class BFTSMaRtPort(private val off: Int) {
private enum class BFTSmartPort(private val off: Int) {
FOR_CLIENTS(0),
FOR_REPLICAS(1);

View File

@ -1,4 +1,4 @@
package net.corda.notary.bftsmart
package net.corda.notary.experimental.bftsmart
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.SettableFuture
@ -22,10 +22,11 @@ import net.corda.core.utilities.unwrap
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.config.parseAs
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import java.security.PublicKey
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Table
import kotlin.concurrent.thread
@ -34,7 +35,7 @@ import kotlin.concurrent.thread
*
* A transaction is notarised when the consensus is reached by the cluster on its uniqueness, and time-window validity.
*/
class BftSmartNotaryService(
class BFTSmartNotaryService(
override val services: ServiceHubInternal,
override val notaryIdentityKey: PublicKey
) : NotaryService() {
@ -54,29 +55,27 @@ class BftSmartNotaryService(
}
private val notaryConfig = services.configuration.notary
?: throw IllegalArgumentException("Failed to register ${BftSmartNotaryService::class.java}: notary configuration not present")
?: throw IllegalArgumentException("Failed to register ${BFTSmartNotaryService::class.java}: notary configuration not present")
private val bftSMaRtConfig = try {
notaryConfig.extraConfig!!.parseAs<BFTSMaRtConfiguration>()
} catch (e: Exception) {
throw IllegalArgumentException("Failed to register ${BftSmartNotaryService::class.java}: BFT-Smart configuration not present")
}
private val bftSMaRtConfig = notaryConfig.bftSMaRt
?: throw IllegalArgumentException("Failed to register ${BFTSmartNotaryService::class.java}: BFT-Smart configuration not present")
private val cluster: BFTSMaRt.Cluster = makeBFTCluster(notaryIdentityKey, bftSMaRtConfig)
private val cluster: BFTSmart.Cluster = makeBFTCluster(notaryIdentityKey, bftSMaRtConfig)
protected open fun makeBFTCluster(notaryKey: PublicKey, bftSMaRtConfig: BFTSMaRtConfiguration): BFTSMaRt.Cluster {
return object : BFTSMaRt.Cluster {
protected open fun makeBFTCluster(notaryKey: PublicKey, bftSMaRtConfig: BFTSmartConfig): BFTSmart.Cluster {
return object : BFTSmart.Cluster {
override fun waitUntilAllReplicasHaveInitialized() {
log.warn("A BFT replica may still be initializing, in which case the upcoming consensus change may cause it to spin.")
}
}
}
private val client: BFTSMaRt.Client
private val client: BFTSmart.Client
private val replicaHolder = SettableFuture.create<Replica>()
init {
client = BFTSMaRtConfig(bftSMaRtConfig.clusterAddresses, bftSMaRtConfig.debug, bftSMaRtConfig.exposeRaces).use {
client = BFTSmartConfigInternal(bftSMaRtConfig.clusterAddresses, bftSMaRtConfig.debug, bftSMaRtConfig.exposeRaces)
.use {
val replicaId = bftSMaRtConfig.replicaId
val configHandle = it.handle()
// Replica startup must be in parallel with other replicas, otherwise the constructor may not return:
@ -87,7 +86,7 @@ class BftSmartNotaryService(
log.info("BFT SMaRt replica $replicaId is running.")
}
}
BFTSMaRt.Client(it, replicaId, cluster, this)
BFTSmart.Client(it, replicaId, cluster, this)
}
}
@ -100,7 +99,7 @@ class BftSmartNotaryService(
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = ServiceFlow(otherPartySession, this)
private class ServiceFlow(val otherSideSession: FlowSession, val service: BftSmartNotaryService) : FlowLogic<Void?>() {
private class ServiceFlow(val otherSideSession: FlowSession, val service: BFTSmartNotaryService) : FlowLogic<Void?>() {
@Suspendable
override fun call(): Void? {
val payload = otherSideSession.receive<NotarisationPayload>().unwrap { it }
@ -112,12 +111,12 @@ class BftSmartNotaryService(
private fun commit(payload: NotarisationPayload): NotarisationResponse {
val response = service.commitTransaction(payload, otherSideSession.counterparty)
when (response) {
is BFTSMaRt.ClusterResponse.Error -> {
is BFTSmart.ClusterResponse.Error -> {
// TODO: here we assume that all error will be the same, but there might be invalid onces from mailicious nodes
val responseError = response.errors.first().verified()
throw NotaryException(responseError, payload.coreTransaction.id)
}
is BFTSMaRt.ClusterResponse.Signatures -> {
is BFTSmart.ClusterResponse.Signatures -> {
log.debug("All input states of transaction ${payload.coreTransaction.id} have been committed")
return NotarisationResponse(response.txSignatures)
}
@ -125,6 +124,14 @@ class BftSmartNotaryService(
}
}
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}bft_committed_txs")
class CommittedTransaction(
@Id
@Column(name = "transaction_id", nullable = false, length = 64)
val transactionId: String
)
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}bft_committed_states")
class CommittedState(id: PersistentStateRef, consumingTxHash: String) : PersistentUniquenessProvider.BaseComittedState(id, consumingTxHash)
@ -153,20 +160,20 @@ class BftSmartNotaryService(
)
}
private class Replica(config: BFTSMaRtConfig,
private class Replica(config: BFTSmartConfigInternal,
replicaId: Int,
createMap: () -> AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef>,
services: ServiceHubInternal,
notaryIdentityKey: PublicKey) : BFTSMaRt.Replica(config, replicaId, createMap, services, notaryIdentityKey) {
notaryIdentityKey: PublicKey) : BFTSmart.Replica(config, replicaId, createMap, services, notaryIdentityKey) {
override fun executeCommand(command: ByteArray): ByteArray {
val commitRequest = command.deserialize<BFTSMaRt.CommitRequest>()
val commitRequest = command.deserialize<BFTSmart.CommitRequest>()
verifyRequest(commitRequest)
val response = verifyAndCommitTx(commitRequest.payload.coreTransaction, commitRequest.callerIdentity, commitRequest.payload.requestSignature)
return response.serialize().bytes
}
private fun verifyAndCommitTx(transaction: CoreTransaction, callerIdentity: Party, requestSignature: NotarisationRequestSignature): BFTSMaRt.ReplicaResponse {
private fun verifyAndCommitTx(transaction: CoreTransaction, callerIdentity: Party, requestSignature: NotarisationRequestSignature): BFTSmart.ReplicaResponse {
return try {
val id = transaction.id
val inputs = transaction.inputs
@ -176,17 +183,17 @@ class BftSmartNotaryService(
if (notary !in services.myInfo.legalIdentities) throw NotaryInternalException(NotaryError.WrongNotary)
commitInputStates(inputs, id, callerIdentity.name, requestSignature, timeWindow, references)
log.debug { "Inputs committed successfully, signing $id" }
BFTSMaRt.ReplicaResponse.Signature(sign(id))
BFTSmart.ReplicaResponse.Signature(sign(id))
} catch (e: NotaryInternalException) {
log.debug { "Error processing transaction: ${e.error}" }
val serializedError = e.error.serialize()
val errorSignature = sign(serializedError.bytes)
val signedError = SignedData(serializedError, errorSignature)
BFTSMaRt.ReplicaResponse.Error(signedError)
BFTSmart.ReplicaResponse.Error(signedError)
}
}
private fun verifyRequest(commitRequest: BFTSMaRt.CommitRequest) {
private fun verifyRequest(commitRequest: BFTSmart.CommitRequest) {
val transaction = commitRequest.payload.coreTransaction
val notarisationRequest = NotarisationRequest(transaction.inputs, transaction.id)
notarisationRequest.verifySignature(commitRequest.payload.requestSignature, commitRequest.callerIdentity)

View File

@ -1,18 +1,18 @@
package net.corda.notary.bftsmart
package net.corda.notary.experimental.bftsmart
import net.corda.core.schemas.MappedSchema
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.notary.bftsmart.BftSmartNotaryService
object BftSmartNotarySchema
object BFTSmartNotarySchema
object BftSmartNotarySchemaV1 : MappedSchema(
schemaFamily = BftSmartNotarySchema.javaClass,
object BFTSmartNotarySchemaV1 : MappedSchema(
schemaFamily = BFTSmartNotarySchema.javaClass,
version = 1,
mappedTypes = listOf(
PersistentUniquenessProvider.BaseComittedState::class.java,
PersistentUniquenessProvider.Request::class.java,
BftSmartNotaryService.CommittedState::class.java
BFTSmartNotaryService.CommittedState::class.java,
BFTSmartNotaryService.CommittedTransaction::class.java
)
) {
override val migrationResource: String?

View File

@ -1,4 +1,4 @@
package net.corda.notary.raft
package net.corda.notary.experimental.raft
import net.corda.core.utilities.NetworkHostAndPort

View File

@ -1,13 +1,12 @@
package net.corda.notary.raft
package net.corda.notary.experimental.raft
import net.corda.core.flows.FlowSession
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.utilities.seconds
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.node.services.transactions.ValidatingNotaryFlow
import net.corda.nodeapi.internal.config.parseAs
import java.security.PublicKey
/** A highly available notary service using the Raft algorithm to achieve consensus. */
@ -19,11 +18,9 @@ class RaftNotaryService(
?: throw IllegalArgumentException("Failed to register ${RaftNotaryService::class.java}: notary configuration not present")
override val uniquenessProvider = with(services) {
val raftConfig = try {
notaryConfig.extraConfig!!.parseAs<RaftConfig>()
} catch (e: Exception) {
throw IllegalArgumentException("Failed to register ${RaftNotaryService::class.java}: raft configuration not present")
}
val raftConfig = notaryConfig.raft
?: throw IllegalArgumentException("Failed to register ${RaftNotaryService::class.java}: raft configuration not present")
RaftUniquenessProvider(
configuration.baseDirectory,
configuration.p2pSslOptions,

View File

@ -1,4 +1,4 @@
package net.corda.notary.raft
package net.corda.notary.experimental.raft
import io.atomix.catalyst.buffer.BufferInput
import io.atomix.catalyst.buffer.BufferOutput
@ -20,10 +20,11 @@ import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.notary.isConsumedByTheSameTx
import net.corda.core.internal.notary.validateTimeWindow
import net.corda.core.serialization.*
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
@ -90,28 +91,64 @@ class RaftTransactionCommitLog<E, EK>(
log.debug("State machine commit: attempting to store entries with keys (${commitCommand.states.joinToString()})")
checkConflict(commitCommand.states, StateConsumptionDetails.ConsumedStateType.INPUT_STATE)
checkConflict(commitCommand.references, StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
if (conflictingStates.isNotEmpty()) {
if (isConsumedByTheSameTx(commitCommand.txId.sha256(), conflictingStates)) {
null
if (commitCommand.states.isEmpty()) {
handleReferenceConflicts(txId, conflictingStates)
} else {
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
NotaryError.Conflict(txId, conflictingStates)
handleConflicts(txId, conflictingStates)
}
} else {
val outsideTimeWindowError = validateTimeWindow(clock.instant(), commitCommand.timeWindow)
if (outsideTimeWindowError == null) {
val entries = commitCommand.states.map { it to Pair(index, txId) }.toMap()
map.putAll(entries)
log.debug { "Successfully committed all input states: ${commitCommand.states}" }
null
} else {
outsideTimeWindowError
}
handleNoConflicts(commitCommand.timeWindow, commitCommand.states, txId, index)
}
}
}
}
private fun handleReferenceConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>): NotaryError? {
if (!previouslyCommitted(txId)) {
val conflictError = NotaryError.Conflict(txId, conflictingStates)
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
return conflictError
}
log.debug { "Transaction $txId already notarised" }
return null
}
private fun handleConflicts(txId: SecureHash, conflictingStates: java.util.LinkedHashMap<StateRef, StateConsumptionDetails>): NotaryError? {
return if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
log.debug { "Transaction $txId already notarised" }
null
} else {
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
NotaryError.Conflict(txId, conflictingStates)
}
}
private fun handleNoConflicts(timeWindow: TimeWindow?, states: List<StateRef>, txId: SecureHash, index: Long): NotaryError? {
// Skip if this is a re-notarisation of a reference-only transaction
if (states.isEmpty() && previouslyCommitted(txId)) {
return null
}
val outsideTimeWindowError = validateTimeWindow(clock.instant(), timeWindow)
return if (outsideTimeWindowError == null) {
val entries = states.map { it to Pair(index, txId) }.toMap()
map.putAll(entries)
val session = currentDBSession()
session.persist(RaftUniquenessProvider.CommittedTransaction(txId.toString()))
log.debug { "Successfully committed all input states: $states" }
null
} else {
outsideTimeWindowError
}
}
private fun previouslyCommitted(txId: SecureHash): Boolean {
val session = currentDBSession()
return session.find(RaftUniquenessProvider.CommittedTransaction::class.java, txId.toString()) != null
}
private fun logRequest(commitCommand: Commands.CommitTransaction) {
val request = PersistentUniquenessProvider.Request(
consumingTxHash = commitCommand.txId.toString(),

View File

@ -1,4 +1,4 @@
package net.corda.notary.raft
package net.corda.notary.experimental.raft
import com.codahale.metrics.Gauge
import com.codahale.metrics.MetricRegistry
@ -22,8 +22,9 @@ import net.corda.core.identity.Party
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
@ -31,15 +32,12 @@ import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.notary.raft.RaftTransactionCommitLog.Commands.CommitTransaction
import net.corda.notary.experimental.raft.RaftTransactionCommitLog.Commands.CommitTransaction
import java.nio.file.Path
import java.time.Clock
import java.util.concurrent.CompletableFuture
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.Column
import javax.persistence.EmbeddedId
import javax.persistence.Entity
import javax.persistence.Table
import javax.persistence.*
/**
* A uniqueness provider that records committed input states in a distributed collection replicated and
@ -51,7 +49,8 @@ import javax.persistence.Table
*/
@ThreadSafe
class RaftUniquenessProvider(
private val storagePath: Path,
/** If *null* the Raft log will be stored in memory. */
private val storagePath: Path? = null,
private val transportConfiguration: MutualSslConfiguration,
private val db: CordaPersistence,
private val clock: Clock,
@ -61,24 +60,26 @@ class RaftUniquenessProvider(
) : UniquenessProvider, SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
fun createMap(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, CommittedState, PersistentStateRef> =
fun createMap(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, CommittedState, String> =
AppendOnlyPersistentMap(
cacheFactory = cacheFactory,
name = "RaftUniquenessProvider_transactions",
toPersistentEntityKey = { PersistentStateRef(it) },
toPersistentEntityKey = { it.encoded() },
fromPersistentEntity = {
val txId = it.id.txId
val index = it.id.index
Pair(
StateRef(txhash = SecureHash.parse(txId), index = index),
Pair(it.index, SecureHash.parse(it.value) as SecureHash))
it.key.parseStateRef(),
Pair(
it.index,
it.value.deserialize<SecureHash>(context = SerializationDefaults.STORAGE_CONTEXT)
)
)
},
toPersistentEntity = { k: StateRef, (first, second) ->
CommittedState(
PersistentStateRef(k),
second.toString(),
first)
CommittedState().apply {
key = k.encoded()
value = second.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
index = first
}
},
persistentEntityClass = CommittedState::class.java
@ -91,14 +92,26 @@ class RaftUniquenessProvider(
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}raft_committed_states")
class CommittedState(
@EmbeddedId
val id: PersistentStateRef,
@Column(name = "consuming_transaction_id", nullable = true)
var value: String? = "",
@Column(name = "raft_log_index", nullable = false)
@Id
@Column(name = "id", nullable = false)
var key: String = "",
@Lob
@Column(name = "state_value", nullable = false)
var value: ByteArray = ByteArray(0),
@Column(name = "state_index")
var index: Long = 0
)
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}raft_committed_txs")
class CommittedTransaction(
@Id
@Column(name = "transaction_id", nullable = false, length = 64)
val transactionId: String
)
private lateinit var _clientFuture: CompletableFuture<CopycatClient>
private lateinit var server: CopycatServer
@ -110,9 +123,9 @@ class RaftUniquenessProvider(
get() = _clientFuture.get()
fun start() {
log.info("Creating Copycat server, log stored in: ${storagePath.toAbsolutePath()}")
log.info("Creating Copycat server, log stored in: ${storagePath?.toAbsolutePath() ?: " memory"}")
val stateMachineFactory = {
RaftTransactionCommitLog(db, clock, { createMap(cacheFactory) })
RaftTransactionCommitLog(db, clock) { createMap(cacheFactory) }
}
val address = raftConfig.nodeAddress.let { Address(it.host, it.port) }
val storage = buildStorage(storagePath)
@ -149,11 +162,14 @@ class RaftUniquenessProvider(
server.shutdown()
}
private fun buildStorage(storagePath: Path): Storage? {
return Storage.builder()
.withDirectory(storagePath.toFile())
.withStorageLevel(StorageLevel.DISK)
.build()
private fun buildStorage(storagePath: Path?): Storage? {
val builder = Storage.builder()
if (storagePath != null) {
builder.withDirectory(storagePath.toFile()).withStorageLevel(StorageLevel.DISK)
} else {
builder.withStorageLevel(StorageLevel.MEMORY)
}
return builder.build()
}
private fun buildTransport(config: MutualSslConfiguration): Transport? {

View File

@ -1,4 +1,4 @@
package net.corda.notary.raft
package net.corda.notary.experimental.raft
import net.corda.core.schemas.MappedSchema
import net.corda.node.services.transactions.PersistentUniquenessProvider
@ -11,7 +11,8 @@ object RaftNotarySchemaV1 : MappedSchema(
mappedTypes = listOf(
PersistentUniquenessProvider.BaseComittedState::class.java,
PersistentUniquenessProvider.Request::class.java,
RaftUniquenessProvider.CommittedState::class.java
RaftUniquenessProvider.CommittedState::class.java,
RaftUniquenessProvider.CommittedTransaction::class.java
)
) {
override val migrationResource: String?

View File

@ -0,0 +1,14 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<changeSet author="R3.Corda" id="create-bft-committed-transactions-table">
<createTable tableName="node_bft_committed_txs">
<column name="transaction_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="transaction_id" constraintName="node_bft_transactions_pkey" tableName="node_bft_committed_txs"/>
</changeSet>
</databaseChangeLog>

View File

@ -3,7 +3,8 @@
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<include file="migration/node-bft-smart.changelog-init.xml"/>
<include file="migration/node-bft-smart.changelog-v1.xml"/>
<include file="migration/node-bft-smart.changelog-pkey.xml"/>
<include file="migration/notary-bft-smart.changelog-init.xml"/>
<include file="migration/notary-bft-smart.changelog-v1.xml"/>
<include file="migration/notary-bft-smart.changelog-pkey.xml"/>
<include file="migration/notary-bft-smart.changelog-committed-transactions-table.xml"/>
</databaseChangeLog>

View File

@ -0,0 +1,14 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<changeSet author="R3.Corda" id="create-raft-committed-transactions-table">
<createTable tableName="node_raft_committed_txs">
<column name="transaction_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="transaction_id" constraintName="node_raft_transactions_pkey" tableName="node_raft_committed_txs"/>
</changeSet>
</databaseChangeLog>

View File

@ -6,14 +6,11 @@
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="1511451595465-18">
<createTable tableName="node_raft_committed_states">
<column name="transaction_id" type="NVARCHAR(64)">
<column name="id" type="NVARCHAR(128)">
<constraints nullable="false"/>
</column>
<column name="output_index" type="INT">
<constraints nullable="false"/>
</column>
<column name="raft_log_index" type="BIGINT"/>
<column name="consuming_transaction_id" type="NVARCHAR(64)"/>
<column name="state_index" type="BIGINT"/>
<column name="state_value" type="BLOB"/>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="1521131680317-17">
@ -36,7 +33,7 @@
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="1511451595465-43">
<addPrimaryKey columnNames="output_index, transaction_id" constraintName="node_raft_state_pkey"
<addPrimaryKey columnNames="id" constraintName="node_raft_state_pkey"
tableName="node_raft_committed_states"/>
</changeSet>
<changeSet author="R3.Corda" id="1521131680317-48">

View File

@ -3,9 +3,8 @@
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<include file="migration/node-notary-raft.changelog-init.xml"/>
<include file="migration/node-notary-raft.changelog-v1.xml"/>
<include file="migration/node-notary-raft.changelog-pkey.xml"/>
<include file="migration/notary-raft.changelog-init.xml"/>
<include file="migration/notary-raft.changelog-v1.xml"/>
<include file="migration/notary-raft.changelog-pkey.xml"/>
<include file="migration/notary-raft.changelog-committed-transactions-table.xml" />
</databaseChangeLog>

View File

@ -5,7 +5,7 @@
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="nullability">
<addNotNullConstraint tableName="node_raft_committed_states" columnName="raft_log_index" columnDataType="BIGINT"/>
<addNotNullConstraint tableName="node_raft_committed_states" columnName="consuming_transaction_id" columnDataType="NVARCHAR(64)"/>
<addNotNullConstraint tableName="node_raft_committed_states" columnName="state_index" columnDataType="BIGINT"/>
<addNotNullConstraint tableName="node_raft_committed_states" columnName="state_value" columnDataType="BLOB"/>
</changeSet>
</databaseChangeLog>

View File

@ -1,16 +1,15 @@
package net.corda.node.services
import co.paralleluniverse.fibers.Suspendable
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryFlow
import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
@ -26,6 +25,9 @@ import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.network.NetworkParametersCopier
@ -35,15 +37,8 @@ import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.LogHelper
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.MockNetFlowTimeOut
import net.corda.testing.node.MockNetNotaryConfig
import net.corda.testing.node.MockNetworkParameters
import net.corda.testing.node.MockNodeConfigOverrides
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.cordappsForPackages
import net.corda.testing.node.internal.startFlow
import net.corda.testing.node.internal.*
import org.junit.AfterClass
import org.junit.Before
import org.junit.BeforeClass
@ -88,7 +83,6 @@ class TimedFlowTests {
notary = started.first
node = started.second
patientNode = started.third
}
@AfterClass
@ -105,33 +99,38 @@ class TimedFlowTests {
serviceLegalName)
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, false))))
val notaryConfig = MockNetNotaryConfig(
serviceLegalName = serviceLegalName,
validating = false,
className = TestNotaryService::class.java.name
)
val notaryConfig = mock<NotaryConfig> {
whenever(it.serviceLegalName).thenReturn(serviceLegalName)
whenever(it.validating).thenReturn(true)
whenever(it.className).thenReturn(TestNotaryService::class.java.name)
}
val notaryNodes = (0 until CLUSTER_SIZE).map {
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = MockNodeConfigOverrides(
notary = notaryConfig
)))
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = {
doReturn(notaryConfig).whenever(it).notary
}))
}
val aliceNode = mockNet.createUnstartedNode(
InternalMockNodeParameters(
legalName = CordaX500Name("Alice", "AliceCorp", "GB"),
configOverrides = MockNodeConfigOverrides(flowTimeout = MockNetFlowTimeOut(2.seconds, 3, 1.0))
configOverrides = { conf: NodeConfiguration ->
val retryConfig = FlowTimeoutConfiguration(1.seconds, 3, 1.0)
doReturn(retryConfig).whenever(conf).flowTimeout
}
)
)
val patientNode = mockNet.createUnstartedNode(
InternalMockNodeParameters(
legalName = CordaX500Name("Bob", "BobCorp", "GB"),
configOverrides = MockNodeConfigOverrides(flowTimeout = MockNetFlowTimeOut(10.seconds, 3, 1.0))
configOverrides = { conf: NodeConfiguration ->
val retryConfig = FlowTimeoutConfiguration(10.seconds, 3, 1.0)
doReturn(retryConfig).whenever(conf).flowTimeout
}
)
)
// MockNetwork doesn't support notary clusters, so we create all the nodes we need unstarted, and then install the
// network-parameters in their directories before they're started.
val nodes = (notaryNodes + aliceNode + patientNode).map { node ->

View File

@ -1,6 +1,8 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
@ -9,8 +11,7 @@ import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.packageName
import net.corda.core.utilities.seconds
import net.corda.testing.node.MockNetFlowTimeOut
import net.corda.testing.node.MockNodeConfigOverrides
import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.testing.node.internal.*
import org.junit.After
import org.junit.Before
@ -35,7 +36,10 @@ class IdempotentFlowTests {
mockNet = InternalMockNetwork(threadPerNode = true, cordappsForAllNodes = cordappsForPackages(this.javaClass.packageName))
nodeA = mockNet.createNode(InternalMockNodeParameters(
legalName = CordaX500Name("Alice", "AliceCorp", "GB"),
configOverrides = MockNodeConfigOverrides(flowTimeout = MockNetFlowTimeOut(1.seconds, 3, 1.0))
configOverrides = {
val retryConfig = FlowTimeoutConfiguration(1.seconds, 3, 1.0)
doReturn(retryConfig).whenever(it).flowTimeout
}
))
nodeB = mockNet.createNode()
mockNet.startNodes()
@ -71,7 +75,7 @@ class IdempotentFlowTests {
@Suspendable
override fun call() {
subFlowExecutionCounter.incrementAndGet() // No checkpoint should be taken before invoking IdempotentSubFlow,
// so this should be replayed when TimedSubFlow restarts.
// so this should be replayed when TimedSubFlow restarts.
subFlow(IdempotentSubFlow()) // Checkpoint shouldn't be taken before invoking the sub-flow.
}
}

View File

@ -1,5 +1,6 @@
package net.corda.node.services.transactions
import com.codahale.metrics.MetricRegistry
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.NullKeys
@ -10,16 +11,21 @@ import net.corda.core.flows.NotaryError
import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.minutes
import net.corda.node.services.schema.NodeSchemaService
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.notary.experimental.raft.RaftConfig
import net.corda.notary.experimental.raft.RaftNotarySchemaV1
import net.corda.notary.experimental.raft.RaftUniquenessProvider
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.generateStateRef
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase
import net.corda.testing.internal.configureTestSSL
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.TestClock
import org.junit.After
@ -39,7 +45,8 @@ class UniquenessProviderTests(
@JvmStatic
@Parameterized.Parameters(name = "{0}")
fun data(): Collection<UniquenessProviderFactory> = listOf(
PersistentUniquenessProviderFactory()
PersistentUniquenessProviderFactory(),
RaftUniquenessProviderFactory()
)
}
@ -152,11 +159,16 @@ class UniquenessProviderTests(
.get()
assertEquals(UniquenessProvider.Result.Success, result)
// Idempotency: can re-notarise successfully.
testClock.advanceBy(90.minutes)
val result2 = uniquenessProvider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow, references = listOf(referenceState))
// The reference state gets consumed.
val result2 = uniquenessProvider.commit(listOf(referenceState), SecureHash.randomSHA256(), identity, requestSignature, timeWindow)
.get()
assertEquals(UniquenessProvider.Result.Success, result2)
// Idempotency: can re-notarise successfully.
testClock.advanceBy(90.minutes)
val result3 = uniquenessProvider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow, references = listOf(referenceState))
.get()
assertEquals(UniquenessProvider.Result.Success, result3)
}
@Test
@ -389,3 +401,34 @@ class PersistentUniquenessProviderFactory : UniquenessProviderFactory {
database?.close()
}
}
class RaftUniquenessProviderFactory : UniquenessProviderFactory {
private var database: CordaPersistence? = null
private var provider: RaftUniquenessProvider? = null
override fun create(clock: Clock): UniquenessProvider {
database?.close()
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, NodeSchemaService(extraSchemas = setOf(RaftNotarySchemaV1)))
val testSSL = configureTestSSL(CordaX500Name("Raft", "London", "GB"))
val raftNodePort = 10987
return RaftUniquenessProvider(
null,
testSSL,
database!!,
clock,
MetricRegistry(),
TestingNamedCacheFactory(),
RaftConfig(NetworkHostAndPort("localhost", raftNodePort), emptyList())
).apply {
start()
provider = this
}
}
override fun cleanUp() {
provider?.stop()
database?.close()
}
}

View File

@ -1,5 +1,7 @@
package net.corda.notary.bftsmart
package net.corda.notary.experimental.bftsmart
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateRef
@ -19,15 +21,16 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.Try
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.config.toConfig
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.notary.experimental.bftsmart.BFTSmartConfig
import net.corda.notary.experimental.bftsmart.minClusterSize
import net.corda.notary.experimental.bftsmart.minCorrectReplicas
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.MockNetNotaryConfig
import net.corda.testing.node.MockNodeConfigOverrides
import net.corda.testing.node.TestClock
import net.corda.testing.node.internal.*
import org.hamcrest.Matchers.instanceOf
@ -54,7 +57,7 @@ class BFTNotaryServiceTests {
@BeforeClass
@JvmStatic
fun before() {
mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts", "net.corda.notary.bftsmart"))
mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"))
val clusterSize = minClusterSize(1)
val started = startBftClusterAndNode(clusterSize, mockNet)
notary = started.first
@ -80,12 +83,15 @@ class BFTNotaryServiceTests {
val clusterAddresses = replicaIds.map { NetworkHostAndPort("localhost", 11000 + it * 10) }
val nodes = replicaIds.map { replicaId ->
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = MockNodeConfigOverrides(notary = MockNetNotaryConfig(
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = {
val notary = NotaryConfig(
validating = false,
extraConfig = BFTSMaRtConfiguration(replicaId, clusterAddresses, exposeRaces = exposeRaces).toConfig(),
className = "net.corda.notary.bftsmart.BftSmartNotaryService",
bftSMaRt = BFTSmartConfig(replicaId, clusterAddresses, exposeRaces = exposeRaces),
serviceLegalName = serviceLegalName
))))
)
doReturn(notary).whenever(it).notary
}))
} + mockNet.createUnstartedNode()
// MockNetwork doesn't support BFT clusters, so we create all the nodes we need unstarted, and then install the

View File

@ -1,12 +1,16 @@
package net.corda.notary.bftsmart
package net.corda.notary.experimental.bftsmart
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.notary.bftsmart.BFTSMaRtConfig.Companion.portIsClaimedFormat
import net.corda.notary.experimental.bftsmart.BFTSmartConfigInternal
import net.corda.notary.experimental.bftsmart.BFTSmartConfigInternal.Companion.portIsClaimedFormat
import net.corda.notary.experimental.bftsmart.maxFaultyReplicas
import net.corda.notary.experimental.bftsmart.minClusterSize
import net.corda.notary.experimental.bftsmart.minCorrectReplicas
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import kotlin.test.assertEquals
class BFTSMaRtConfigTests {
class BFTSmartConfigTests {
@Test
fun `replica arithmetic`() {
(1..20).forEach { n ->
@ -28,7 +32,7 @@ class BFTSMaRtConfigTests {
@Test
fun `overlapping port ranges are rejected`() {
fun config(vararg ports: Int) = BFTSMaRtConfig(ports.map { NetworkHostAndPort("localhost", it) }, false, false)
fun config(vararg ports: Int) = BFTSmartConfigInternal(ports.map { NetworkHostAndPort("localhost", it) }, false, false)
assertThatThrownBy { config(11000, 11001).use {} }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessage(portIsClaimedFormat.format("localhost:11001", setOf("localhost:11000", "localhost:11001")))

View File

@ -1,4 +1,4 @@
package net.corda.notary.raft
package net.corda.notary.experimental.raft
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef

View File

@ -1,4 +1,4 @@
package net.corda.notary.raft
package net.corda.notary.experimental.raft
import io.atomix.catalyst.transport.Address
import io.atomix.copycat.client.ConnectionStrategies
@ -17,6 +17,7 @@ import net.corda.core.utilities.getOrThrow
import net.corda.node.services.schema.NodeSchemaService
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.notary.experimental.raft.RaftNotarySchemaV1
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.internal.incrementalPortAllocation

View File

@ -0,0 +1,118 @@
# Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
############################################
####### Communication Configurations #######
############################################
#HMAC algorithm used to authenticate messages between processes (HmacMD5 is the default value)
#This parameter is not currently being used being used
#system.authentication.hmacAlgorithm = HmacSHA1
#Specify if the communication system should use a thread to send data (true or false)
system.communication.useSenderThread = true
#Force all processes to use the same public/private keys pair and secret key. This is useful when deploying experiments
#and benchmarks, but must not be used in production systems.
system.communication.defaultkeys = true
############################################
### Replication Algorithm Configurations ###
############################################
#Number of servers in the group
system.servers.num = %s
#Maximum number of faulty replicas
system.servers.f = %s
#Timeout to asking for a client request
system.totalordermulticast.timeout = 2000
#Maximum batch size (in number of messages)
system.totalordermulticast.maxbatchsize = 400
#Number of nonces (for non-determinism actions) generated
system.totalordermulticast.nonces = 10
#if verification of leader-generated timestamps are increasing
#it can only be used on systems in which the network clocks
#are synchronized
system.totalordermulticast.verifyTimestamps = false
#Quantity of messages that can be stored in the receive queue of the communication system
system.communication.inQueueSize = 500000
# Quantity of messages that can be stored in the send queue of each replica
system.communication.outQueueSize = 500000
#Set to 1 if SMaRt should use signatures, set to 0 if otherwise
system.communication.useSignatures = 0
#Set to 1 if SMaRt should use MAC's, set to 0 if otherwise
system.communication.useMACs = 1
#Set to 1 if SMaRt should use the standard output to display debug messages, set to 0 if otherwise
system.debug = %s
#Print information about the replica when it is shutdown
system.shutdownhook = true
############################################
###### State Transfer Configurations #######
############################################
#Activate the state transfer protocol ('true' to activate, 'false' to de-activate)
system.totalordermulticast.state_transfer = false
#Maximum ahead-of-time message not discarded
system.totalordermulticast.highMark = 10000
#Maximum ahead-of-time message not discarded when the replica is still on EID 0 (after which the state transfer is triggered)
system.totalordermulticast.revival_highMark = 10
#Number of ahead-of-time messages necessary to trigger the state transfer after a request timeout occurs
system.totalordermulticast.timeout_highMark = 200
############################################
###### Log and Checkpoint Configurations ###
############################################
system.totalordermulticast.log = false
system.totalordermulticast.log_parallel = false
system.totalordermulticast.log_to_disk = false
system.totalordermulticast.sync_log = false
#Period at which BFT-SMaRt requests the state to the application (for the state transfer state protocol)
system.totalordermulticast.checkpoint_period = 1
system.totalordermulticast.global_checkpoint_period = 1
system.totalordermulticast.checkpoint_to_disk = false
system.totalordermulticast.sync_ckp = false
############################################
###### Reconfiguration Configurations ######
############################################
#Replicas ID for the initial view, separated by a comma.
# The number of replicas in this parameter should be equal to that specified in 'system.servers.num'
system.initial.view = %s
#The ID of the trust third party (TTP)
system.ttp.id = 7002
#This sets if the system will function in Byzantine or crash-only mode. Set to "true" to support Byzantine faults
system.bft = true

View File

@ -22,10 +22,6 @@ dependencies {
cordaCompile project(':client:jfx')
cordaCompile project(':client:rpc')
cordaCompile project(':test-utils')
// Notary implementations
cordapp project(':experimental:notary-raft')
cordapp project(':experimental:notary-bft-smart')
}
def nodeTask = tasks.getByPath(':node:capsule:assemble')
@ -87,11 +83,10 @@ task deployNodesCustom(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
}
task deployNodesRaft(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
def className = "net.corda.notary.raft.RaftNotaryService"
def className = ""
directory file("$buildDir/nodes/nodesRaft")
nodeDefaults {
extraConfig = [h2Settings: [address: "localhost:0"]]
cordapp project(':experimental:notary-raft')
}
node {
name "O=Alice Corp,L=Madrid,C=ES"
@ -112,10 +107,9 @@ task deployNodesRaft(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [
validating: true,
serviceLegalName: "O=Raft,L=Zurich,C=CH",
extraConfig: [
raft: [
nodeAddress: "localhost:10008"
],
className: className
]
]
}
node {
@ -128,11 +122,10 @@ task deployNodesRaft(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [
validating: true,
serviceLegalName: "O=Raft,L=Zurich,C=CH",
extraConfig: [
raft: [
nodeAddress: "localhost:10012",
clusterAddresses: ["localhost:10008"]
],
className: className
]
]
}
node {
@ -145,22 +138,19 @@ task deployNodesRaft(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [
validating: true,
serviceLegalName: "O=Raft,L=Zurich,C=CH",
extraConfig: [
raft: [
nodeAddress: "localhost:10016",
clusterAddresses: ["localhost:10008"]
],
className: className
]
]
}
}
task deployNodesBFT(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
def clusterAddresses = ["localhost:11000", "localhost:11010", "localhost:11020", "localhost:11030"]
def className = "net.corda.notary.bftsmart.BftSmartNotaryService"
directory file("$buildDir/nodes/nodesBFT")
nodeDefaults {
extraConfig = [h2Settings: [address: "localhost:0"]]
cordapp project(':experimental:notary-bft-smart')
}
node {
name "O=Alice Corp,L=Madrid,C=ES"
@ -181,11 +171,10 @@ task deployNodesBFT(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [
validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH",
extraConfig: [
bftSMaRt: [
replicaId: 0,
clusterAddresses: clusterAddresses
],
className: className
]
]
}
node {
@ -198,11 +187,10 @@ task deployNodesBFT(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [
validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH",
extraConfig: [
bftSMaRt: [
replicaId: 1,
clusterAddresses: clusterAddresses
],
className: className
]
]
}
node {
@ -215,11 +203,10 @@ task deployNodesBFT(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [
validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH",
extraConfig: [
bftSMaRt: [
replicaId: 2,
clusterAddresses: clusterAddresses
],
className: className
]
]
}
node {
@ -232,11 +219,10 @@ task deployNodesBFT(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [
validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH",
extraConfig: [
bftSMaRt: [
replicaId: 3,
clusterAddresses: clusterAddresses
],
className: className
]
]
}
}

View File

@ -1,6 +1,7 @@
package net.corda.notarydemo.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.BelongsToContract
import net.corda.core.contracts.CommandData
import net.corda.core.contracts.Contract
import net.corda.core.contracts.ContractState
@ -24,6 +25,7 @@ class DummyIssueAndMove(private val notary: Party, private val counterpartyNode:
data class DummyCommand(val dummy: Int = 0) : CommandData
@BelongsToContract(DoNothingContract::class)
data class State(override val participants: List<AbstractParty>, val discriminator: Int) : ContractState
@Suspendable

View File

@ -25,8 +25,6 @@ include 'experimental:avalanche'
include 'experimental:behave'
include 'experimental:quasar-hook'
include 'experimental:corda-utils'
include 'experimental:notary-raft'
include 'experimental:notary-bft-smart'
include 'jdk8u-deterministic'
include 'test-common'
include 'test-cli'

View File

@ -25,8 +25,6 @@ sourceSets {
}
dependencies {
// Bundling in the Raft notary service for tests involving distributed notaries
compile project(':experimental:notary-raft')
compile project(':test-utils')
// Integration test helpers

View File

@ -37,6 +37,7 @@ import net.corda.nodeapi.internal.crypto.X509KeyStore
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
import net.corda.notary.experimental.raft.RaftConfig
import net.corda.serialization.internal.amqp.AbstractAMQPSerializationScheme
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
@ -494,16 +495,15 @@ class DriverDSLImpl(
private fun startRaftNotaryCluster(spec: NotarySpec, localNetworkMap: LocalNetworkMap?): CordaFuture<List<NodeHandle>> {
fun notaryConfig(nodeAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): Map<String, Any> {
val clusterAddresses = if (clusterAddress != null) listOf(clusterAddress) else emptyList()
val config = configOf("notary" to mapOf(
"validating" to spec.validating,
"serviceLegalName" to spec.name.toString(),
"className" to "net.corda.notary.raft.RaftNotaryService",
"extraConfig" to mapOf(
"nodeAddress" to nodeAddress.toString(),
"clusterAddresses" to clusterAddresses.map { it.toString() }
))
val config = NotaryConfig(
validating = spec.validating,
serviceLegalName = spec.name,
raft = RaftConfig(
nodeAddress = nodeAddress,
clusterAddresses = clusterAddresses
)
)
return config.root().unwrapped()
return mapOf("notary" to config.toConfig().root().unwrapped())
}
val nodeNames = generateNodeNames(spec)

View File

@ -35,10 +35,7 @@ import net.corda.node.internal.NodeFlowManager
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.node.services.config.NetworkParameterAcceptanceSettings
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.VerifierType
import net.corda.node.services.config.*
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.node.services.keys.KeyManagementServiceInternal
@ -90,7 +87,7 @@ data class InternalMockNodeParameters(
val forcedID: Int? = null,
val legalName: CordaX500Name? = null,
val entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue()),
val configOverrides: MockNodeConfigOverrides? = null,
val configOverrides: (NodeConfiguration) -> Any? = {},
val version: VersionInfo = MOCK_VERSION_INFO,
val additionalCordapps: Collection<TestCordappInternal> = emptyList(),
val flowManager: MockNodeFlowManager = MockNodeFlowManager()) {
@ -98,7 +95,7 @@ data class InternalMockNodeParameters(
mockNodeParameters.forcedID,
mockNodeParameters.legalName,
mockNodeParameters.entropyRoot,
mockNodeParameters.configOverrides,
{ mockNodeParameters.configOverrides?.applyMockNodeOverrides(it) },
MOCK_VERSION_INFO,
uncheckedCast(mockNodeParameters.additionalCordapps)
)
@ -260,7 +257,7 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
return notarySpecs.map { (name, validating) ->
createNode(InternalMockNodeParameters(
legalName = name,
configOverrides = MockNodeConfigOverrides(notary = MockNetNotaryConfig(validating))
configOverrides = { doReturn(NotaryConfig(validating)).whenever(it).notary }
))
}
}
@ -468,7 +465,7 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
doReturn(makeTestDataSourceProperties("node_${id}_net_$networkId")).whenever(it).dataSourceProperties
doReturn(emptyList<SecureHash>()).whenever(it).extraNetworkMapKeys
doReturn(listOf(baseDirectory / "cordapps")).whenever(it).cordappDirectories
parameters.configOverrides?.applyMockNodeOverrides(it)
parameters.configOverrides(it)
}
TestCordappInternal.installCordapps(baseDirectory, parameters.additionalCordapps.toSet(), combinedCordappsForAllNodes)

View File

@ -9,11 +9,7 @@ import net.corda.testing.node.MockNetNotaryConfig
import net.corda.testing.node.MockNodeConfigOverrides
fun MockNetNotaryConfig.toNotaryConfig(): NotaryConfig {
return if (this.className == null) {
NotaryConfig(validating = this.validating, extraConfig = this.extraConfig, serviceLegalName = this.serviceLegalName)
} else {
NotaryConfig(validating = this.validating, extraConfig = this.extraConfig, serviceLegalName = this.serviceLegalName, className = this.className)
}
return NotaryConfig(validating = this.validating, extraConfig = this.extraConfig, serviceLegalName = this.serviceLegalName, className = this.className)
}
fun MockNodeConfigOverrides.applyMockNodeOverrides(config: NodeConfiguration) {