Merge pull request #1496 from corda/andrius/merge-10-22

Andrius/merge 10 22
This commit is contained in:
Andrius Dagys
2018-10-22 18:22:01 +01:00
committed by GitHub
30 changed files with 262 additions and 230 deletions

View File

@ -16,6 +16,29 @@ Unreleased
* Vault storage of contract state constraints metadata and associated vault query functions to retrieve and sort by constraint type. * Vault storage of contract state constraints metadata and associated vault query functions to retrieve and sort by constraint type.
* UPGRADE REQUIRED: changes have been made to how notary implementations are configured and loaded.
No upgrade steps are required for the single-node notary (both validating and non-validating variants).
Other notary implementations have been moved out of the Corda node into individual Cordapps, and require configuration
file updates.
To run a notary you will now need to include the appropriate notary CorDapp in the ``cordapps/`` directory:
* ``corda-notary-raft`` for the Raft notary.
* ``corda-notary-bft-smart`` for the BFT-Smart notary.
It is now required to specify the fully qualified notary service class name, ``className``, and the legal name of
the notary service in case of distributed notaries: ``serviceLegalName``.
Implementation-specific configuration values have been moved to the ``extraConfig`` configuration block.
Example configuration changes for the Raft notary:
.. image:: resources/notary-config-update.png
Example configuration changes for the BFT-Smart notary:
.. image:: resources/notary-config-update-bft.png
* New overload for ``CordaRPCClient.start()`` method allowing to specify target legal identity to use for RPC call. * New overload for ``CordaRPCClient.start()`` method allowing to specify target legal identity to use for RPC call.
* Case insensitive vault queries can be specified via a boolean on applicable SQL criteria builder operators. By default * Case insensitive vault queries can be specified via a boolean on applicable SQL criteria builder operators. By default

View File

@ -151,34 +151,17 @@ absolute path to the node's base directory.
:security: Contains various nested fields controlling user authentication/authorization, in particular for RPC accesses. See :security: Contains various nested fields controlling user authentication/authorization, in particular for RPC accesses. See
:doc:`clientrpc` for details. :doc:`clientrpc` for details.
:notary: Optional configuration object which if present configures the node to run as a notary. If part of a Raft or BFT SMaRt :notary: Optional configuration object which if present configures the node to run as a notary.
cluster then specify ``raft`` or ``bftSMaRt`` respectively as described below. If a single node notary then omit both.
:validating: Boolean to determine whether the notary is a validating or non-validating one. :validating: Boolean to determine whether the notary is a validating or non-validating one.
:serviceLegalName: If the node is part of a distributed cluster, specify the legal name of the cluster. At runtime, Corda :serviceLegalName: If the node is part of a distributed cluster, specify the legal name of the cluster. At runtime, Corda
checks whether this name matches the name of the certificate of the notary cluster. checks whether this name matches the name of the certificate of the notary cluster.
:raft: If part of a distributed Raft cluster specify this config object, with the following settings: :className: The fully qualified class name of the notary service to run. The class is expected to be loaded from
a notary CorDapp. Defaults to run the ``SimpleNotaryService``, which is built in.
:nodeAddress: The host and port to which to bind the embedded Raft server. Note that the Raft cluster uses a :extraConfig: an optional configuration block for providing notary implementation-specific values.
separate transport layer for communication that does not integrate with ArtemisMQ messaging services.
:clusterAddresses: Must list the addresses of all the members in the cluster. At least one of the members must
be active and be able to communicate with the cluster leader for the node to join the cluster. If empty, a
new cluster will be bootstrapped.
:bftSMaRt: If part of a distributed BFT-SMaRt cluster specify this config object, with the following settings:
:replicaId: The zero-based index of the current replica. All replicas must specify a unique replica id.
:clusterAddresses: Must list the addresses of all the members in the cluster. At least one of the members must
be active and be able to communicate with the cluster leader for the node to join the cluster. If empty, a
new cluster will be bootstrapped.
:custom: If `true`, will load and install a notary service from a CorDapp. See :doc:`tutorial-custom-notary`.
Only one of ``raft``, ``bftSMaRt`` or ``custom`` configuration values may be specified.
:rpcUsers: A list of users who are authorised to access the RPC system. Each user in the list is a config object with the :rpcUsers: A list of users who are authorised to access the RPC system. Each user in the list is a config object with the
following fields: following fields:

Binary file not shown.

After

Width:  |  Height:  |  Size: 128 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 84 KiB

View File

@ -48,8 +48,6 @@ Command-line options
The node can optionally be started with the following command-line options: The node can optionally be started with the following command-line options:
* ``--base-directory``, ``-b``: The node working directory where all the files are kept (default: ``.``). * ``--base-directory``, ``-b``: The node working directory where all the files are kept (default: ``.``).
* ``--bootstrap-raft-cluster``: Bootstraps Raft cluster. The node forms a single node cluster (ignoring otherwise configured peer
addresses), acting as a seed for other nodes to join the cluster.
* ``--clear-network-map-cache``, ``-c``: Clears local copy of network map, on node startup it will be restored from server or file system. * ``--clear-network-map-cache``, ``-c``: Clears local copy of network map, on node startup it will be restored from server or file system.
* ``--config-file``, ``-f``: The path to the config file. Defaults to ``node.conf``. * ``--config-file``, ``-f``: The path to the config file. Defaults to ``node.conf``.
* ``--dev-mode``, ``-d``: Runs the node in developer mode. Unsafe in production. Defaults to true on MacOS and desktop versions of Windows. False otherwise. * ``--dev-mode``, ``-d``: Runs the node in developer mode. Unsafe in production. Defaults to true on MacOS and desktop versions of Windows. False otherwise.

View File

@ -13,6 +13,24 @@ import java.net.SocketException
import java.nio.file.Files import java.nio.file.Files
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
data class BFTSMaRtConfiguration(
/** The zero-based index of the current replica. All replicas must specify a unique replica id. */
val replicaId: Int,
/**
* Must list the addresses of all the members in the cluster. At least one of the members must be active and
* be able to communicate with the cluster leader for the node to join the cluster. If empty,
* a new cluster will be bootstrapped.
*/
val clusterAddresses: List<NetworkHostAndPort>,
val debug: Boolean = false,
/** Used for testing purposes only. */
val exposeRaces: Boolean = false
) {
init {
require(replicaId >= 0) { "replicaId cannot be negative" }
}
}
/** /**
* BFT SMaRt can only be configured via files in a configHome directory. * BFT SMaRt can only be configured via files in a configHome directory.
* Each instance of this class creates such a configHome, accessible via [path]. * Each instance of this class creates such a configHome, accessible via [path].

View File

@ -20,9 +20,9 @@ import net.corda.core.utilities.debug
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.config.parseAs
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import java.security.PublicKey import java.security.PublicKey
import javax.persistence.Entity import javax.persistence.Entity
@ -54,10 +54,13 @@ class BftSmartNotaryService(
} }
private val notaryConfig = services.configuration.notary private val notaryConfig = services.configuration.notary
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") ?: throw IllegalArgumentException("Failed to register ${BftSmartNotaryService::class.java}: notary configuration not present")
private val bftSMaRtConfig = notaryConfig.bftSMaRt private val bftSMaRtConfig = try {
?: throw IllegalArgumentException("Failed to register ${this::class.java}: raft configuration not present") notaryConfig.extraConfig!!.parseAs<BFTSMaRtConfiguration>()
} catch (e: Exception) {
throw IllegalArgumentException("Failed to register ${BftSmartNotaryService::class.java}: BFT-Smart configuration not present")
}
private val cluster: BFTSMaRt.Cluster = makeBFTCluster(notaryIdentityKey, bftSMaRtConfig) private val cluster: BFTSMaRt.Cluster = makeBFTCluster(notaryIdentityKey, bftSMaRtConfig)

View File

@ -21,17 +21,17 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.Try import net.corda.core.utilities.Try
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.config.NotaryConfig import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.config.toConfig
import net.corda.nodeapi.internal.network.NetworkParametersCopier import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.contracts.DummyContract import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.dummyCommand import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity import net.corda.testing.core.singleIdentity
import net.corda.testing.node.TestClock
import net.corda.testing.internal.IntegrationTest import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.node.TestClock
import net.corda.testing.node.internal.* import net.corda.testing.node.internal.*
import org.hamcrest.Matchers.instanceOf import org.hamcrest.Matchers.instanceOf
import org.junit.AfterClass import org.junit.AfterClass
@ -64,7 +64,7 @@ class BFTNotaryServiceTests {
@JvmStatic @JvmStatic
fun before() { fun before() {
IntegrationTest.globalSetUp() //Enterprise only - remote db setup IntegrationTest.globalSetUp() //Enterprise only - remote db setup
mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts")) mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts", "net.corda.notary.bftsmart"))
val clusterSize = minClusterSize(1) val clusterSize = minClusterSize(1)
val started = startBftClusterAndNode(clusterSize, mockNet) val started = startBftClusterAndNode(clusterSize, mockNet)
notary = started.first notary = started.first
@ -81,10 +81,10 @@ class BFTNotaryServiceTests {
fun startBftClusterAndNode(clusterSize: Int, mockNet: InternalMockNetwork, exposeRaces: Boolean = false): Pair<Party, TestStartedNode> { fun startBftClusterAndNode(clusterSize: Int, mockNet: InternalMockNetwork, exposeRaces: Boolean = false): Pair<Party, TestStartedNode> {
(Paths.get("config") / "currentView").deleteIfExists() // XXX: Make config object warn if this exists? (Paths.get("config") / "currentView").deleteIfExists() // XXX: Make config object warn if this exists?
val replicaIds = (0 until clusterSize) val replicaIds = (0 until clusterSize)
val serviceLegalName = CordaX500Name("BFT", "Zurich", "CH")
val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity( val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) }, replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
CordaX500Name("BFT", "Zurich", "CH")) serviceLegalName)
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, false)))) val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, false))))
@ -94,8 +94,9 @@ class BFTNotaryServiceTests {
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = { mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = {
val notary = NotaryConfig( val notary = NotaryConfig(
validating = false, validating = false,
bftSMaRt = BFTSMaRtConfiguration(replicaId, clusterAddresses, exposeRaces = exposeRaces), extraConfig = BFTSMaRtConfiguration(replicaId, clusterAddresses, exposeRaces = exposeRaces).toConfig(),
className = "net.corda.notary.bftsmart.BftSmartNotaryService" className = "net.corda.notary.bftsmart.BftSmartNotaryService",
serviceLegalName = serviceLegalName
) )
doReturn(notary).whenever(it).notary doReturn(notary).whenever(it).notary
})) }))

View File

@ -0,0 +1,18 @@
package net.corda.notary.raft
import net.corda.core.utilities.NetworkHostAndPort
/** Configuration properties specific to the RaftNotaryService. */
data class RaftConfig(
/**
* The host and port to which to bind the embedded Raft server. Note that the Raft cluster uses a
* separate transport layer for communication that does not integrate with ArtemisMQ messaging services.
*/
val nodeAddress: NetworkHostAndPort,
/**
* Must list the addresses of all the members in the cluster. At least one of the members mustbe active and
* be able to communicate with the cluster leader for the node to join the cluster. If empty, a new cluster
* will be bootstrapped.
*/
val clusterAddresses: List<NetworkHostAndPort>
)

View File

@ -6,6 +6,7 @@ import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.node.services.transactions.ValidatingNotaryFlow import net.corda.node.services.transactions.ValidatingNotaryFlow
import net.corda.nodeapi.internal.config.parseAs
import java.security.PublicKey import java.security.PublicKey
/** A highly available notary service using the Raft algorithm to achieve consensus. */ /** A highly available notary service using the Raft algorithm to achieve consensus. */
@ -14,11 +15,14 @@ class RaftNotaryService(
override val notaryIdentityKey: PublicKey override val notaryIdentityKey: PublicKey
) : TrustedAuthorityNotaryService() { ) : TrustedAuthorityNotaryService() {
private val notaryConfig = services.configuration.notary private val notaryConfig = services.configuration.notary
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") ?: throw IllegalArgumentException("Failed to register ${RaftNotaryService::class.java}: notary configuration not present")
override val uniquenessProvider = with(services) { override val uniquenessProvider = with(services) {
val raftConfig = notaryConfig.raft val raftConfig = try {
?: throw IllegalArgumentException("Failed to register ${this::class.java}: raft configuration not present") notaryConfig.extraConfig!!.parseAs<RaftConfig>()
} catch (e: Exception) {
throw IllegalArgumentException("Failed to register ${RaftNotaryService::class.java}: raft configuration not present")
}
RaftUniquenessProvider( RaftUniquenessProvider(
configuration.baseDirectory, configuration.baseDirectory,
configuration.p2pSslOptions, configuration.p2pSslOptions,

View File

@ -26,7 +26,6 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.node.services.config.RaftConfig
import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence

View File

@ -126,8 +126,8 @@ internal constructor(private val initSerEnv: Boolean,
} }
return clusteredNotaries.groupBy { it.first }.map { (k, vs) -> return clusteredNotaries.groupBy { it.first }.map { (k, vs) ->
val cs = vs.map { it.second.config } val cs = vs.map { it.second.config }
if (cs.any { it.hasPath("notary.bftSMaRt") }) { if (cs.any { isBFTNotary(it) }) {
require(cs.all { it.hasPath("notary.bftSMaRt") }) { "Mix of BFT and non-BFT notaries with service name $k" } require(cs.all { isBFTNotary(it) }) { "Mix of BFT and non-BFT notaries with service name $k" }
NotaryCluster.BFT(k) to vs.map { it.second.directory } NotaryCluster.BFT(k) to vs.map { it.second.directory }
} else { } else {
NotaryCluster.CFT(k) to vs.map { it.second.directory } NotaryCluster.CFT(k) to vs.map { it.second.directory }
@ -135,6 +135,12 @@ internal constructor(private val initSerEnv: Boolean,
}.toMap() }.toMap()
} }
private fun isBFTNotary(config: Config): Boolean {
// TODO: pass a commandline parameter to the bootstrapper instead. Better yet, a notary config map
// specifying the notary identities and the type (single-node, CFT, BFT) of each notary to set up.
return config.getString ("notary.className").contains("BFT", true)
}
private fun generateServiceIdentitiesForNotaryClusters(configs: Map<Path, Config>) { private fun generateServiceIdentitiesForNotaryClusters(configs: Map<Path, Config>) {
notaryClusters(configs).forEach { (cluster, directories) -> notaryClusters(configs).forEach { (cluster, directories) ->
when (cluster) { when (cluster) {

View File

@ -88,12 +88,6 @@ class NodeCmdLineOptions {
) )
var justGenerateRpcSslCerts: Boolean = false var justGenerateRpcSslCerts: Boolean = false
@Option(
names = ["--bootstrap-raft-cluster"],
description = ["Bootstraps Raft cluster. The node forms a single node cluster (ignoring otherwise configured peer addresses), acting as a seed for other nodes to join the cluster."]
)
var bootstrapRaftCluster: Boolean = false
@Option( @Option(
names = ["-c", "--clear-network-map-cache"], names = ["-c", "--clear-network-map-cache"],
description = ["Clears local copy of network map, on node startup it will be restored from server or file system."] description = ["Clears local copy of network map, on node startup it will be restored from server or file system."]

View File

@ -270,7 +270,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
check(started == null) { "Node has already been started" } check(started == null) { "Node has already been started" }
log.info("Generating nodeInfo ...") log.info("Generating nodeInfo ...")
val trustRoot = initKeyStores() val trustRoot = initKeyStores()
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) val (identity, identityKeyPair) = obtainIdentity()
startDatabase() startDatabase()
val nodeCa = configuration.signingCertificateStore.get()[CORDA_CLIENT_CA] val nodeCa = configuration.signingCertificateStore.get()[CORDA_CLIENT_CA]
identityService.start(trustRoot, listOf(identity.certificate, nodeCa)) identityService.start(trustRoot, listOf(identity.certificate, nodeCa))
@ -325,7 +325,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
networkMapCache.start(netParams.notaries) networkMapCache.start(netParams.notaries)
startDatabase() startDatabase()
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) val (identity, identityKeyPair) = obtainIdentity()
identityService.start(trustRoot, listOf(identity.certificate, nodeCa)) identityService.start(trustRoot, listOf(identity.certificate, nodeCa))
val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration
@ -417,8 +417,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
val keyPairs = mutableSetOf(identityKeyPair) val keyPairs = mutableSetOf(identityKeyPair)
val myNotaryIdentity = configuration.notary?.let { val myNotaryIdentity = configuration.notary?.let {
if (it.isClusterConfig) { if (it.serviceLegalName != null) {
val (notaryIdentity, notaryIdentityKeyPair) = obtainIdentity(it) val (notaryIdentity, notaryIdentityKeyPair) = loadNotaryClusterIdentity(it.serviceLegalName)
keyPairs += notaryIdentityKeyPair keyPairs += notaryIdentityKeyPair
notaryIdentity notaryIdentity
} else { } else {
@ -870,24 +870,14 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
myNotaryIdentity: PartyAndCertificate?, myNotaryIdentity: PartyAndCertificate?,
networkParameters: NetworkParameters) networkParameters: NetworkParameters)
private fun obtainIdentity(notaryConfig: NotaryConfig?): Pair<PartyAndCertificate, KeyPair> { /** Loads or generates the node's legal identity and key-pair. */
private fun obtainIdentity(): Pair<PartyAndCertificate, KeyPair> {
val keyStore = configuration.signingCertificateStore.get() val keyStore = configuration.signingCertificateStore.get()
val legalName = configuration.myLegalName
val (id, singleName) = if (notaryConfig == null || !notaryConfig.isClusterConfig) {
// Node's main identity or if it's a single node notary.
Pair(NODE_IDENTITY_ALIAS_PREFIX, configuration.myLegalName)
} else {
// The node is part of a distributed notary whose identity must already be generated beforehand.
Pair(DISTRIBUTED_NOTARY_ALIAS_PREFIX, null)
}
// TODO: Integrate with Key management service? // TODO: Integrate with Key management service?
val privateKeyAlias = "$id-private-key" val privateKeyAlias = "$NODE_IDENTITY_ALIAS_PREFIX-private-key"
if (privateKeyAlias !in keyStore) { if (privateKeyAlias !in keyStore) {
// We shouldn't have a distributed notary at this stage, so singleName should NOT be null.
requireNotNull(singleName) {
"Unable to find in the key store the identity of the distributed notary the node is part of"
}
log.info("$privateKeyAlias not found in key store, generating fresh key!") log.info("$privateKeyAlias not found in key store, generating fresh key!")
keyStore.storeLegalIdentity(privateKeyAlias, generateKeyPair()) keyStore.storeLegalIdentity(privateKeyAlias, generateKeyPair())
} }
@ -895,30 +885,41 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
val (x509Cert, keyPair) = keyStore.query { getCertificateAndKeyPair(privateKeyAlias, keyStore.entryPassword) } val (x509Cert, keyPair) = keyStore.query { getCertificateAndKeyPair(privateKeyAlias, keyStore.entryPassword) }
// TODO: Use configuration to indicate composite key should be used instead of public key for the identity. // TODO: Use configuration to indicate composite key should be used instead of public key for the identity.
val compositeKeyAlias = "$id-composite-key" val certificates = keyStore.query { getCertificateChain(privateKeyAlias) }
check(certificates.first() == x509Cert) {
"Certificates from key store do not line up!"
}
val subject = CordaX500Name.build(certificates.first().subjectX500Principal)
if (subject != legalName) {
throw ConfigurationException("The name '$legalName' for $NODE_IDENTITY_ALIAS_PREFIX doesn't match what's in the key store: $subject")
}
val certPath = X509Utilities.buildCertPath(certificates)
return Pair(PartyAndCertificate(certPath), keyPair)
}
/** Loads pre-generated notary service cluster identity. */
private fun loadNotaryClusterIdentity(serviceLegalName: CordaX500Name): Pair<PartyAndCertificate, KeyPair> {
val keyStore = configuration.signingCertificateStore.get()
val privateKeyAlias = "$DISTRIBUTED_NOTARY_ALIAS_PREFIX-private-key"
val keyPair = keyStore.query { getCertificateAndKeyPair(privateKeyAlias, keyStore.entryPassword) }.keyPair
val compositeKeyAlias = "$DISTRIBUTED_NOTARY_ALIAS_PREFIX-composite-key"
val certificates = if (compositeKeyAlias in keyStore) { val certificates = if (compositeKeyAlias in keyStore) {
// Use composite key instead if it exists.
val certificate = keyStore[compositeKeyAlias] val certificate = keyStore[compositeKeyAlias]
// We have to create the certificate chain for the composite key manually, this is because we don't have a keystore // We have to create the certificate chain for the composite key manually, this is because we don't have a keystore
// provider that understand compositeKey-privateKey combo. The cert chain is created using the composite key certificate + // provider that understand compositeKey-privateKey combo. The cert chain is created using the composite key certificate +
// the tail of the private key certificates, as they are both signed by the same certificate chain. // the tail of the private key certificates, as they are both signed by the same certificate chain.
listOf(certificate) + keyStore.query { getCertificateChain(privateKeyAlias) }.drop(1) listOf(certificate) + keyStore.query { getCertificateChain(privateKeyAlias) }.drop(1)
} else { } else throw IllegalStateException("The identity public key for the notary service $serviceLegalName was not found in the key store.")
keyStore.query { getCertificateChain(privateKeyAlias) }.let {
check(it[0] == x509Cert) { "Certificates from key store do not line up!" }
it
}
}
val subject = CordaX500Name.build(certificates[0].subjectX500Principal) val subject = CordaX500Name.build(certificates.first().subjectX500Principal)
if (singleName != null && subject != singleName) { if (subject != serviceLegalName) {
throw ConfigurationException("The name '$singleName' for $id doesn't match what's in the key store: $subject") throw ConfigurationException("The name of the notary service '$serviceLegalName' for $DISTRIBUTED_NOTARY_ALIAS_PREFIX doesn't " +
} else if (notaryConfig != null && notaryConfig.isClusterConfig && notaryConfig.serviceLegalName != null && subject != notaryConfig.serviceLegalName) {
// Note that we're not checking if `notaryConfig.serviceLegalName` is not present for backwards compatibility.
throw ConfigurationException("The name of the notary service '${notaryConfig.serviceLegalName}' for $id doesn't " +
"match what's in the key store: $subject. You might need to adjust the configuration of `notary.serviceLegalName`.") "match what's in the key store: $subject. You might need to adjust the configuration of `notary.serviceLegalName`.")
} }
val certPath = X509Utilities.buildCertPath(certificates) val certPath = X509Utilities.buildCertPath(certificates)
return Pair(PartyAndCertificate(certPath), keyPair) return Pair(PartyAndCertificate(certPath), keyPair)
} }

View File

@ -18,7 +18,6 @@ import net.corda.node.*
import net.corda.node.internal.Node.Companion.isValidJavaVersion import net.corda.node.internal.Node.Companion.isValidJavaVersion
import net.corda.node.internal.cordapp.MultipleCordappsForFlowException import net.corda.node.internal.cordapp.MultipleCordappsForFlowException
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NodeConfigurationImpl
import net.corda.node.services.config.shouldStartLocalShell import net.corda.node.services.config.shouldStartLocalShell
import net.corda.node.services.config.shouldStartSSHDaemon import net.corda.node.services.config.shouldStartSSHDaemon
import net.corda.node.utilities.createKeyPairAndSelfSignedTLSCertificate import net.corda.node.utilities.createKeyPairAndSelfSignedTLSCertificate
@ -27,7 +26,6 @@ import net.corda.node.utilities.registration.NodeRegistrationException
import net.corda.node.utilities.registration.NodeRegistrationHelper import net.corda.node.utilities.registration.NodeRegistrationHelper
import net.corda.node.utilities.saveToKeyStore import net.corda.node.utilities.saveToKeyStore
import net.corda.node.utilities.saveToTrustStore import net.corda.node.utilities.saveToTrustStore
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.nodeapi.internal.addShutdownHook import net.corda.nodeapi.internal.addShutdownHook
import net.corda.nodeapi.internal.config.UnknownConfigurationKeysException import net.corda.nodeapi.internal.config.UnknownConfigurationKeysException
import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
@ -191,14 +189,7 @@ open class NodeStartup : CordaCliWrapper("corda", "Runs a Corda Node") {
if (cmdLineOptions.devMode == true) { if (cmdLineOptions.devMode == true) {
println("Config:\n${rawConfig.root().render(ConfigRenderOptions.defaults())}") println("Config:\n${rawConfig.root().render(ConfigRenderOptions.defaults())}")
} }
val configuration = configurationResult.getOrThrow() return configurationResult.getOrThrow()
return if (cmdLineOptions.bootstrapRaftCluster) {
println("Bootstrapping raft cluster (starting up as seed node).")
// Ignore the configured clusterAddresses to make the node bootstrap a cluster instead of joining.
(configuration as NodeConfigurationImpl).copy(notary = configuration.notary?.copy(raft = configuration.notary?.raft?.copy(clusterAddresses = emptyList())))
} else {
configuration
}
} }
private fun checkRegistrationMode(): Boolean { private fun checkRegistrationMode(): Boolean {

View File

@ -11,12 +11,7 @@ import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.services.config.rpc.NodeRpcOptions import net.corda.node.services.config.rpc.NodeRpcOptions
import net.corda.nodeapi.BrokerRpcSslOptions import net.corda.nodeapi.BrokerRpcSslOptions
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier import net.corda.nodeapi.internal.config.*
import net.corda.nodeapi.internal.config.SslConfiguration
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.config.UnknownConfigKeysPolicy
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.parseAs
import net.corda.nodeapi.internal.persistence.CordaPersistence.DataSourceConfigTag import net.corda.nodeapi.internal.persistence.CordaPersistence.DataSourceConfigTag
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.tools.shell.SSHDConfiguration import net.corda.tools.shell.SSHDConfiguration
@ -79,7 +74,7 @@ interface NodeConfiguration {
val flowMonitorPeriodMillis: Duration get() = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS val flowMonitorPeriodMillis: Duration get() = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS
val flowMonitorSuspensionLoggingThresholdMillis: Duration get() = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS val flowMonitorSuspensionLoggingThresholdMillis: Duration get() = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS
val crlCheckSoftFail: Boolean val crlCheckSoftFail: Boolean
val jmxReporterType : JmxReporterType? get() = defaultJmxReporterType val jmxReporterType: JmxReporterType? get() = defaultJmxReporterType
val baseDirectory: Path val baseDirectory: Path
val certificatesDirectory: Path val certificatesDirectory: Path
@ -132,71 +127,16 @@ fun NodeConfiguration.shouldStartSSHDaemon() = this.sshd != null
fun NodeConfiguration.shouldStartLocalShell() = !this.noLocalShell && System.console() != null && this.devMode fun NodeConfiguration.shouldStartLocalShell() = !this.noLocalShell && System.console() != null && this.devMode
fun NodeConfiguration.shouldInitCrashShell() = shouldStartLocalShell() || shouldStartSSHDaemon() fun NodeConfiguration.shouldInitCrashShell() = shouldStartLocalShell() || shouldStartSSHDaemon()
data class NotaryConfig(val validating: Boolean, data class NotaryConfig(
val raft: RaftConfig? = null, /** Specifies whether the notary validates transactions or not. */
val bftSMaRt: BFTSMaRtConfiguration? = null, val validating: Boolean,
val mysql: MySQLConfiguration? = null, /** The legal name of cluster in case of a distributed notary service. */
val serviceLegalName: CordaX500Name? = null, val serviceLegalName: CordaX500Name? = null,
/** The name of the notary service class to load. */
val className: String = "net.corda.node.services.transactions.SimpleNotaryService", val className: String = "net.corda.node.services.transactions.SimpleNotaryService",
val batchSize: Int = 128, /** Notary implementation-specific configuration parameters. */
val batchTimeoutMs: Long = 1L, val extraConfig: Config? = null
val maxInputStates: Int = 2000, )
val maxDBTransactionRetryCount: Int = 10,
val backOffBaseMs: Long = 20L
) {
init {
require(raft == null || bftSMaRt == null || mysql == null) {
"raft, bftSMaRt, and mysql configs cannot be specified together"
}
}
val isClusterConfig: Boolean get() = raft != null || bftSMaRt != null || mysql != null
}
data class MySQLConfiguration(
val dataSource: Properties,
/**
* Number of times to attempt to reconnect to the database.
*/
val connectionRetries: Int = 2, // Default value for a 3 server cluster.
/**
* Time increment between re-connection attempts.
*
* The total back-off duration is calculated as: backOffIncrement * backOffBase ^ currentRetryCount
*/
val backOffIncrement: Int = 500,
/** Exponential back-off multiplier base. */
val backOffBase: Double = 1.5,
/** The maximum number of transactions processed in a single batch. */
val maxBatchSize: Int = 500,
/** The maximum combined number of input states processed in a single batch. */
val maxBatchInputStates: Int = 10_000,
/** A batch will be processed after a specified timeout even if it has not yet reached full capacity. */
val batchTimeoutMs: Long = 200,
/**
* The maximum number of commit requests in flight. Once the capacity is reached the service will block on
* further commit requests.
*/
val maxQueueSize: Int = 100_000
) {
init {
require(connectionRetries >= 0) { "connectionRetries cannot be negative" }
}
}
data class RaftConfig(val nodeAddress: NetworkHostAndPort, val clusterAddresses: List<NetworkHostAndPort>)
/** @param exposeRaces for testing only, so its default is not in reference.conf but here. */
data class BFTSMaRtConfiguration(
val replicaId: Int,
val clusterAddresses: List<NetworkHostAndPort>,
val debug: Boolean = false,
val exposeRaces: Boolean = false
) {
init {
require(replicaId >= 0) { "replicaId cannot be negative" }
}
}
/** /**
* Used as an alternative to the older compatibilityZoneURL to allow the doorman and network map * Used as an alternative to the older compatibilityZoneURL to allow the doorman and network map
@ -217,7 +157,7 @@ data class NetworkServicesConfig(
val doormanURL: URL, val doormanURL: URL,
val networkMapURL: URL, val networkMapURL: URL,
val pnm: UUID? = null, val pnm: UUID? = null,
val inferred : Boolean = false val inferred: Boolean = false
) )
/** /**
@ -416,7 +356,7 @@ data class NodeConfigurationImpl(
override val effectiveH2Settings: NodeH2Settings? override val effectiveH2Settings: NodeH2Settings?
get() = when { get() = when {
h2port != null -> NodeH2Settings(address = NetworkHostAndPort(host="localhost", port=h2port)) h2port != null -> NodeH2Settings(address = NetworkHostAndPort(host = "localhost", port = h2port))
else -> h2Settings else -> h2Settings
} }
@ -453,7 +393,7 @@ data class NodeConfigurationImpl(
// Check for usage of deprecated config // Check for usage of deprecated config
@Suppress("DEPRECATION") @Suppress("DEPRECATION")
if(certificateChainCheckPolicies.isNotEmpty()) { if (certificateChainCheckPolicies.isNotEmpty()) {
logger.warn("""You are configuring certificateChainCheckPolicies. This is a setting that is not used, and will be removed in a future version. logger.warn("""You are configuring certificateChainCheckPolicies. This is a setting that is not used, and will be removed in a future version.
|Please contact the R3 team on the public slack to discuss your use case. |Please contact the R3 team on the public slack to discuss your use case.
""".trimMargin()) """.trimMargin())

View File

@ -6,11 +6,6 @@
required: false required: false
multiParam: true multiParam: true
acceptableValues: [] acceptableValues: []
- parameterName: "--bootstrap-raft-cluster"
parameterType: "boolean"
required: false
multiParam: false
acceptableValues: []
- parameterName: "--clear-network-map-cache" - parameterName: "--clear-network-map-cache"
parameterType: "boolean" parameterType: "boolean"
required: false required: false

View File

@ -35,7 +35,6 @@ class NodeStartupTest {
assertThat(startup.cmdLineOptions.sshdServer).isEqualTo(false) assertThat(startup.cmdLineOptions.sshdServer).isEqualTo(false)
assertThat(startup.cmdLineOptions.justGenerateNodeInfo).isEqualTo(false) assertThat(startup.cmdLineOptions.justGenerateNodeInfo).isEqualTo(false)
assertThat(startup.cmdLineOptions.justGenerateRpcSslCerts).isEqualTo(false) assertThat(startup.cmdLineOptions.justGenerateRpcSslCerts).isEqualTo(false)
assertThat(startup.cmdLineOptions.bootstrapRaftCluster).isEqualTo(false)
assertThat(startup.cmdLineOptions.unknownConfigKeysPolicy).isEqualTo(UnknownConfigKeysPolicy.FAIL) assertThat(startup.cmdLineOptions.unknownConfigKeysPolicy).isEqualTo(UnknownConfigKeysPolicy.FAIL)
assertThat(startup.cmdLineOptions.devMode).isEqualTo(null) assertThat(startup.cmdLineOptions.devMode).isEqualTo(null)
assertThat(startup.cmdLineOptions.clearNetworkMapCache).isEqualTo(false) assertThat(startup.cmdLineOptions.clearNetworkMapCache).isEqualTo(false)

View File

@ -54,13 +54,14 @@ class TimedFlowTestRule(val clusterSize: Int) : ExternalResource() {
private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair<Party, TestStartedNode> { private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair<Party, TestStartedNode> {
val replicaIds = (0 until clusterSize) val replicaIds = (0 until clusterSize)
val serviceLegalName = CordaX500Name("Custom Notary", "Zurich", "CH")
val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity( val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) }, replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
CordaX500Name("Custom Notary", "Zurich", "CH")) serviceLegalName)
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true)))) val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true))))
val notaryConfig = mock<NotaryConfig> { val notaryConfig = mock<NotaryConfig> {
whenever(it.isClusterConfig).thenReturn(true) whenever(it.serviceLegalName).thenReturn(serviceLegalName)
whenever(it.validating).thenReturn(true) whenever(it.validating).thenReturn(true)
whenever(it.className).thenReturn(TimedFlowTests.TestNotaryService::class.java.name) whenever(it.className).thenReturn(TimedFlowTests.TestNotaryService::class.java.name)
} }

View File

@ -0,0 +1,9 @@
package net.corda.notary.jpa
data class JPANotaryConfiguration(
val batchSize: Int = 128,
val batchTimeoutMs: Long = 1L,
val maxInputStates: Int = 2000,
val maxDBTransactionRetryCount: Int = 10,
val backOffBaseMs: Long = 20L
)

View File

@ -6,6 +6,7 @@ import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.node.services.transactions.ValidatingNotaryFlow import net.corda.node.services.transactions.ValidatingNotaryFlow
import net.corda.nodeapi.internal.config.parseAs
import java.security.PublicKey import java.security.PublicKey
/** Notary service backed by a replicated MySQL database. */ /** Notary service backed by a replicated MySQL database. */
@ -17,7 +18,12 @@ class JPANotaryService(
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") ?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
override val asyncUniquenessProvider = with(services) { override val asyncUniquenessProvider = with(services) {
JPAUniquenessProvider(services.clock, services.database, notaryConfig) val jpaNotaryConfig = try {
notaryConfig.extraConfig!!.parseAs<JPANotaryConfiguration>()
} catch (e: Exception) {
throw IllegalArgumentException("Failed to register ${JPANotaryService::class.java}: JPA notary configuration not present")
}
JPAUniquenessProvider(services.clock, services.database, jpaNotaryConfig)
} }
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow { override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow {

View File

@ -12,14 +12,16 @@ import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.* import net.corda.core.internal.notary.AsyncUniquenessProvider
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.isConsumedByTheSameTx
import net.corda.core.internal.notary.validateTimeWindow
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.serialization.internal.CordaSerializationEncoding import net.corda.serialization.internal.CordaSerializationEncoding
@ -36,7 +38,7 @@ import kotlin.concurrent.thread
/** A JPA backed Uniqueness provider */ /** A JPA backed Uniqueness provider */
@ThreadSafe @ThreadSafe
class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val notaryConfig: NotaryConfig) : AsyncUniquenessProvider, SingletonSerializeAsToken() { class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val config: JPANotaryConfiguration) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
// TODO: test vs. MySQLUniquenessProvider // TODO: test vs. MySQLUniquenessProvider
@ -49,7 +51,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
@CordaSerializable @CordaSerializable
class Request( class Request(
@Id @Id
@Column(nullable = true, length=76) @Column(nullable = true, length = 76)
var id: String? = null, var id: String? = null,
@Column(name = "consuming_transaction_id", nullable = true, length = 64) @Column(name = "consuming_transaction_id", nullable = true, length = 64)
@ -79,7 +81,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
@Entity @Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}jpa_notary_committed_states") @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}jpa_notary_committed_states")
@NamedQuery(name = "CommittedState.select", query="SELECT c from JPAUniquenessProvider\$CommittedState c WHERE c.id in :ids") @NamedQuery(name = "CommittedState.select", query = "SELECT c from JPAUniquenessProvider\$CommittedState c WHERE c.id in :ids")
class CommittedState( class CommittedState(
@Id @Id
@Column(name = "state_ref", length = 73) @Column(name = "state_ref", length = 73)
@ -96,7 +98,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
try { try {
val buffer = LinkedList<CommitRequest>() val buffer = LinkedList<CommitRequest>()
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
val drainedSize = Queues.drain(requestQueue, buffer, notaryConfig.batchSize, notaryConfig.batchTimeoutMs, TimeUnit.MILLISECONDS) val drainedSize = Queues.drain(requestQueue, buffer, config.batchSize, config.batchTimeoutMs, TimeUnit.MILLISECONDS)
if (drainedSize == 0) continue if (drainedSize == 0) continue
processRequests(buffer) processRequests(buffer)
buffer.clear() buffer.clear()
@ -173,7 +175,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
val ids = (states + references).map { encodeStateRef(it) }.toSet() val ids = (states + references).map { encodeStateRef(it) }.toSet()
val committedStates = mutableListOf<CommittedState>() val committedStates = mutableListOf<CommittedState>()
for (idsBatch in ids.chunked(notaryConfig.maxInputStates)) { for (idsBatch in ids.chunked(config.maxInputStates)) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
val existing = session.createNamedQuery("CommittedState.select").setParameter("ids", idsBatch).resultList as List<CommittedState> val existing = session.createNamedQuery("CommittedState.select").setParameter("ids", idsBatch).resultList as List<CommittedState>
committedStates.addAll(existing) committedStates.addAll(existing)
@ -192,8 +194,8 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
private fun withRetry(block: () -> Unit) { private fun withRetry(block: () -> Unit) {
var retryCount = 0 var retryCount = 0
var backOff = notaryConfig.backOffBaseMs var backOff = config.backOffBaseMs
while (retryCount < notaryConfig.maxDBTransactionRetryCount) { while (retryCount < config.maxDBTransactionRetryCount) {
try { try {
block() block()
break break
@ -263,7 +265,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
} }
} }
} }
} catch(e: Exception) { } catch (e: Exception) {
log.warn("Error processing commit requests", e) log.warn("Error processing commit requests", e)
for (request in requests) { for (request in requests) {
respondWithError(request, e) respondWithError(request, e)

View File

@ -10,10 +10,11 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.notary.NotaryInternalException import net.corda.core.internal.notary.NotaryInternalException
import net.corda.node.services.config.NotaryConfig import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.schema.NodeSchemaService
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.decodeStateRef import net.corda.nodeapi.internal.config.toConfig
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.encodeStateRef
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.decodeStateRef
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.encodeStateRef
import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity import net.corda.testing.core.TestIdentity
import net.corda.testing.core.generateStateRef import net.corda.testing.core.generateStateRef
@ -35,7 +36,8 @@ class JPAUniquenessProviderTests {
private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party
private val txID = SecureHash.randomSHA256() private val txID = SecureHash.randomSHA256()
private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0) private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0)
private val notaryConfig = NotaryConfig(validating=false, maxInputStates = 10) private val notaryConfig = JPANotaryConfiguration(maxInputStates = 10)
private lateinit var database: CordaPersistence private lateinit var database: CordaPersistence
@ -86,7 +88,7 @@ class JPAUniquenessProviderTests {
@Test @Test
fun `all conflicts are found with batching`() { fun `all conflicts are found with batching`() {
val nrStates = notaryConfig.maxInputStates + notaryConfig.maxInputStates/2 val nrStates = notaryConfig.maxInputStates + notaryConfig.maxInputStates / 2
val stateRefs = (1..nrStates).map { generateStateRef() } val stateRefs = (1..nrStates).map { generateStateRef() }
println(stateRefs.size) println(stateRefs.size)
val firstTxId = SecureHash.randomSHA256() val firstTxId = SecureHash.randomSHA256()

View File

@ -0,0 +1,34 @@
package net.corda.notary.mysql
import java.util.*
data class MySQLNotaryConfiguration(
val dataSource: Properties,
/**
* Number of times to attempt to reconnect to the database.
*/
val connectionRetries: Int = 2, // Default value for a 3 server cluster.
/**
* Time increment between re-connection attempts.
*
* The total back-off duration is calculated as: backOffIncrement * backOffBase ^ currentRetryCount
*/
val backOffIncrement: Int = 500,
/** Exponential back-off multiplier base. */
val backOffBase: Double = 1.5,
/** The maximum number of transactions processed in a single batch. */
val maxBatchSize: Int = 500,
/** The maximum combined number of input states processed in a single batch. */
val maxBatchInputStates: Int = 10_000,
/** A batch will be processed after a specified timeout even if it has not yet reached full capacity. */
val batchTimeoutMs: Long = 200,
/**
* The maximum number of commit requests in flight. Once the capacity is reached the service will block on
* further commit requests.
*/
val maxQueueSize: Int = 100_000
) {
init {
require(connectionRetries >= 0) { "connectionRetries cannot be negative" }
}
}

View File

@ -6,6 +6,7 @@ import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.node.services.transactions.ValidatingNotaryFlow import net.corda.node.services.transactions.ValidatingNotaryFlow
import net.corda.nodeapi.internal.config.parseAs
import java.security.PublicKey import java.security.PublicKey
/** Notary service backed by a replicated MySQL database. */ /** Notary service backed by a replicated MySQL database. */
@ -20,8 +21,11 @@ class MySQLNotaryService(
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") ?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
override val asyncUniquenessProvider = with(services) { override val asyncUniquenessProvider = with(services) {
val mysqlConfig = notaryConfig.mysql val mysqlConfig = try {
?: throw IllegalArgumentException("Failed to register ${this::class.java}: raft configuration not present") notaryConfig.extraConfig!!.parseAs<MySQLNotaryConfiguration>()
} catch (e: Exception) {
throw IllegalArgumentException("Failed to register ${MySQLNotaryService::class.java}: mysql configuration not present")
}
MySQLUniquenessProvider( MySQLUniquenessProvider(
services.monitoringService.metrics, services.monitoringService.metrics,
services.clock, services.clock,

View File

@ -30,7 +30,6 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.node.services.config.MySQLConfiguration
import net.corda.serialization.internal.CordaSerializationEncoding.SNAPPY import net.corda.serialization.internal.CordaSerializationEncoding.SNAPPY
import java.sql.* import java.sql.*
import java.time.Clock import java.time.Clock
@ -49,7 +48,7 @@ import kotlin.concurrent.thread
class MySQLUniquenessProvider( class MySQLUniquenessProvider(
metrics: MetricRegistry, metrics: MetricRegistry,
val clock: Clock, val clock: Clock,
val config: MySQLConfiguration val config: MySQLNotaryConfiguration
) : AsyncUniquenessProvider, SingletonSerializeAsToken() { ) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
companion object { companion object {
private val log = loggerFor<MySQLUniquenessProvider>() private val log = loggerFor<MySQLUniquenessProvider>()

View File

@ -22,9 +22,9 @@ import net.corda.core.node.NotaryInfo
import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.services.config.MySQLConfiguration
import net.corda.node.services.config.NotaryConfig import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.config.toConfig
import net.corda.nodeapi.internal.network.NetworkParametersCopier import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.contracts.DummyContract import net.corda.testing.contracts.DummyContract
@ -256,7 +256,8 @@ class MySQLNotaryServiceTests : IntegrationTest() {
configOverrides = { configOverrides = {
val notaryConfig = NotaryConfig( val notaryConfig = NotaryConfig(
validating = true, validating = true,
mysql = MySQLConfiguration(dataStoreProperties, maxBatchSize = 10, maxBatchInputStates = 100), extraConfig = MySQLNotaryConfiguration(dataStoreProperties, maxBatchSize = 10, maxBatchInputStates = 100).toConfig(),
serviceLegalName = notaryName,
className = MySQLNotaryService::class.java.name className = MySQLNotaryService::class.java.name
) )
doReturn(notaryConfig).whenever(it).notary doReturn(notaryConfig).whenever(it).notary

View File

@ -112,7 +112,7 @@ task deployNodesRaft(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [ notary = [
validating: true, validating: true,
serviceLegalName: "O=Raft,L=Zurich,C=CH", serviceLegalName: "O=Raft,L=Zurich,C=CH",
raft: [ extraConfig: [
nodeAddress: "localhost:10008" nodeAddress: "localhost:10008"
], ],
className: className className: className
@ -128,7 +128,7 @@ task deployNodesRaft(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [ notary = [
validating: true, validating: true,
serviceLegalName: "O=Raft,L=Zurich,C=CH", serviceLegalName: "O=Raft,L=Zurich,C=CH",
raft: [ extraConfig: [
nodeAddress: "localhost:10012", nodeAddress: "localhost:10012",
clusterAddresses: ["localhost:10008"] clusterAddresses: ["localhost:10008"]
], ],
@ -145,7 +145,7 @@ task deployNodesRaft(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [ notary = [
validating: true, validating: true,
serviceLegalName: "O=Raft,L=Zurich,C=CH", serviceLegalName: "O=Raft,L=Zurich,C=CH",
raft: [ extraConfig: [
nodeAddress: "localhost:10016", nodeAddress: "localhost:10016",
clusterAddresses: ["localhost:10008"] clusterAddresses: ["localhost:10008"]
], ],
@ -181,7 +181,7 @@ task deployNodesBFT(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [ notary = [
validating: false, validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH", serviceLegalName: "O=BFT,L=Zurich,C=CH",
bftSMaRt: [ extraConfig: [
replicaId: 0, replicaId: 0,
clusterAddresses: clusterAddresses clusterAddresses: clusterAddresses
], ],
@ -198,7 +198,7 @@ task deployNodesBFT(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [ notary = [
validating: false, validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH", serviceLegalName: "O=BFT,L=Zurich,C=CH",
bftSMaRt: [ extraConfig: [
replicaId: 1, replicaId: 1,
clusterAddresses: clusterAddresses clusterAddresses: clusterAddresses
], ],
@ -215,7 +215,7 @@ task deployNodesBFT(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [ notary = [
validating: false, validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH", serviceLegalName: "O=BFT,L=Zurich,C=CH",
bftSMaRt: [ extraConfig: [
replicaId: 2, replicaId: 2,
clusterAddresses: clusterAddresses clusterAddresses: clusterAddresses
], ],
@ -232,7 +232,7 @@ task deployNodesBFT(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
notary = [ notary = [
validating: false, validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH", serviceLegalName: "O=BFT,L=Zurich,C=CH",
bftSMaRt: [ extraConfig: [
replicaId: 3, replicaId: 3,
clusterAddresses: clusterAddresses clusterAddresses: clusterAddresses
], ],

View File

@ -506,29 +506,30 @@ class DriverDSLImpl(
} }
} }
// TODO This mapping is done is several places including the gradle plugin. In general we need a better way of
// generating the configs for the nodes, probably making use of Any.toConfig()
private fun NotaryConfig.toConfigMap(): Map<String, Any> = mapOf("notary" to toConfig().root().unwrapped())
private fun startSingleNotary(spec: NotarySpec, localNetworkMap: LocalNetworkMap?, customOverrides: Map<String, Any?>): CordaFuture<List<NodeHandle>> { private fun startSingleNotary(spec: NotarySpec, localNetworkMap: LocalNetworkMap?, customOverrides: Map<String, Any?>): CordaFuture<List<NodeHandle>> {
val notaryConfig = mapOf("notary" to mapOf("validating" to spec.validating))
return startRegisteredNode( return startRegisteredNode(
spec.name, spec.name,
localNetworkMap, localNetworkMap,
spec.rpcUsers, spec.rpcUsers,
spec.verifierType, spec.verifierType,
customOverrides = NotaryConfig(spec.validating).toConfigMap() + customOverrides customOverrides = notaryConfig + customOverrides
).map { listOf(it) } ).map { listOf(it) }
} }
private fun startRaftNotaryCluster(spec: NotarySpec, localNetworkMap: LocalNetworkMap?): CordaFuture<List<NodeHandle>> { private fun startRaftNotaryCluster(spec: NotarySpec, localNetworkMap: LocalNetworkMap?): CordaFuture<List<NodeHandle>> {
fun notaryConfig(nodeAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): Map<String, Any> { fun notaryConfig(nodeAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): Map<String, Any> {
val clusterAddresses = if (clusterAddress != null) listOf(clusterAddress) else emptyList() val clusterAddresses = if (clusterAddress != null) listOf(clusterAddress) else emptyList()
val config = NotaryConfig( val config = configOf("notary" to mapOf(
validating = spec.validating, "validating" to spec.validating,
serviceLegalName = spec.name, "serviceLegalName" to spec.name.toString(),
className = "net.corda.notary.raft.RaftNotaryService", "className" to "net.corda.notary.raft.RaftNotaryService",
raft = RaftConfig(nodeAddress = nodeAddress, clusterAddresses = clusterAddresses)) "extraConfig" to mapOf(
return config.toConfigMap() "nodeAddress" to nodeAddress.toString(),
"clusterAddresses" to clusterAddresses.map { it.toString() }
))
)
return config.root().unwrapped()
} }
val nodeNames = generateNodeNames(spec) val nodeNames = generateNodeNames(spec)

View File

@ -12,9 +12,9 @@ import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService import net.corda.core.node.services.CordaService
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.ConfigHelper import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.MySQLConfiguration
import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.nodeapi.internal.config.parseAs import net.corda.nodeapi.internal.config.parseAs
import net.corda.notary.mysql.MySQLNotaryConfiguration
import net.corda.notary.mysql.MySQLUniquenessProvider import net.corda.notary.mysql.MySQLUniquenessProvider
import net.corda.notarytest.flows.AsyncLoadTestFlow import net.corda.notarytest.flows.AsyncLoadTestFlow
import java.net.InetAddress import java.net.InetAddress
@ -61,7 +61,7 @@ class JDBCNotaryService(override val services: AppServiceHub, override val notar
} }
private fun createUniquenessProvider(): MySQLUniquenessProvider { private fun createUniquenessProvider(): MySQLUniquenessProvider {
val mysqlConfig = appConfig.getConfig("mysql").parseAs<MySQLConfiguration>() val mysqlConfig = appConfig.getConfig("mysql").parseAs<MySQLNotaryConfiguration>()
return MySQLUniquenessProvider(createMetricsRegistry(), services.clock, mysqlConfig) return MySQLUniquenessProvider(createMetricsRegistry(), services.clock, mysqlConfig)
} }
} }