ENT-1232: Add MySQL notary service (#187)

* Add MySQL notary service. This is the first iteration of a HA notary service,
and is intended to replace the Raft notary. The backing database should
be a replicated MySQL implementation such as Percona XtraDB Cluster.

Handle MySQLTransactionRollbackException

Add the `mysql-connector-java` to the build file of the node package.

Add Hikari connection pool

* Store requesting party name & key in separate columns (for consistency
with existing uniqueness providers)

* Add idempotency test and additional counters
This commit is contained in:
Andrius Dagys 2017-12-18 09:47:07 +00:00 committed by GitHub
parent a44960da38
commit e57dab6fcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 400 additions and 16 deletions

View File

@ -16,13 +16,21 @@ import java.security.PublicKey
abstract class NotaryService : SingletonSerializeAsToken() { abstract class NotaryService : SingletonSerializeAsToken() {
companion object { companion object {
const val ID_PREFIX = "corda.notary." const val ID_PREFIX = "corda.notary."
fun constructId(validating: Boolean, raft: Boolean = false, bft: Boolean = false, custom: Boolean = false): String { @JvmOverloads
require(Booleans.countTrue(raft, bft, custom) <= 1) { "At most one of raft, bft or custom may be true" } fun constructId(
validating: Boolean,
raft: Boolean = false,
bft: Boolean = false,
custom: Boolean = false,
mysql: Boolean = false
): String {
require(Booleans.countTrue(raft, bft, custom, mysql) <= 1) { "At most one of raft, bft, mysql or custom may be true" }
return StringBuffer(ID_PREFIX).apply { return StringBuffer(ID_PREFIX).apply {
append(if (validating) "validating" else "simple") append(if (validating) "validating" else "simple")
if (raft) append(".raft") if (raft) append(".raft")
if (bft) append(".bft") if (bft) append(".bft")
if (custom) append(".custom") if (custom) append(".custom")
if (mysql) append(".mysql")
}.toString() }.toString()
} }
} }

View File

@ -127,6 +127,8 @@ dependencies {
compile "org.postgresql:postgresql:$postgresql_version" compile "org.postgresql:postgresql:$postgresql_version"
//For Azure SQL and SQL Server support in persistence //For Azure SQL and SQL Server support in persistence
compile 'com.microsoft.sqlserver:mssql-jdbc:6.2.1.jre8' compile 'com.microsoft.sqlserver:mssql-jdbc:6.2.1.jre8'
// For the MySQLUniquenessProvider
compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6'
// SQL connection pooling library // SQL connection pooling library
compile "com.zaxxer:HikariCP:2.5.1" compile "com.zaxxer:HikariCP:2.5.1"

View File

@ -0,0 +1,154 @@
package net.corda.node.services
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.StartedNode
import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.NetworkParametersCopier
import net.corda.nodeapi.internal.NotaryInfo
import net.corda.nodeapi.internal.ServiceIdentityGenerator
import net.corda.testing.*
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNodeParameters
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.startFlow
import org.junit.After
import org.junit.Before
import org.junit.ClassRule
import org.junit.Test
import java.math.BigInteger
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class MySQLNotaryServiceTests : IntegrationTest() {
companion object {
val notaryName = CordaX500Name("MySQL Notary Service", "Zurich", "CH")
@ClassRule
@JvmField
val databaseSchemas = IntegrationTestSchemas("node_0", DUMMY_NOTARY_NAME.toDatabaseSchemaName())
}
private lateinit var mockNet: MockNetwork
private lateinit var node: StartedNode<MockNetwork.MockNode>
private lateinit var notaryParty: Party
private lateinit var notaryNode: StartedNode<MockNetwork.MockNode>
@Before
fun before() {
mockNet = MockNetwork(cordappPackages = listOf("net.corda.testing.contracts"))
notaryParty = ServiceIdentityGenerator.generateToDisk(
listOf(mockNet.baseDirectory(mockNet.nextNodeId)),
notaryName,
"identity"
)
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryParty, false))))
val notaryNodeUnstarted = createNotaryNode()
val nodeUnstarted = mockNet.createUnstartedNode()
val startedNodes = listOf(notaryNodeUnstarted, nodeUnstarted).map { n ->
networkParameters.install(mockNet.baseDirectory(n.id))
n.start()
}
notaryNode = startedNodes.first()
node = startedNodes.last()
}
@After
fun stopNodes() {
mockNet.stopNodes()
}
@Test
fun `detect double spend`() {
val inputState = issueState(node, notaryParty)
val firstTxBuilder = TransactionBuilder(notaryParty)
.addInputState(inputState)
.addCommand(dummyCommand(node.services.myInfo.chooseIdentity().owningKey))
val firstSpendTx = node.services.signInitialTransaction(firstTxBuilder)
val firstSpend = node.services.startFlow(NotaryFlow.Client(firstSpendTx))
mockNet.runNetwork()
firstSpend.resultFuture.getOrThrow()
val secondSpendBuilder = TransactionBuilder(notaryParty).withItems(inputState).run {
val dummyState = DummyContract.SingleOwnerState(0, node.info.chooseIdentity())
addOutputState(dummyState, DummyContract.PROGRAM_ID)
addCommand(dummyCommand(node.services.myInfo.chooseIdentity().owningKey))
this
}
val secondSpendTx = node.services.signInitialTransaction(secondSpendBuilder)
val secondSpend = node.services.startFlow(NotaryFlow.Client(secondSpendTx))
mockNet.runNetwork()
val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() }
val error = ex.error as NotaryError.Conflict
assertEquals(error.txId, secondSpendTx.id)
}
@Test
fun `notarisations are idempotent`() {
val inputState = issueState(node, notaryParty)
val txBuilder = TransactionBuilder(notaryParty)
.addInputState(inputState)
.addCommand(dummyCommand(node.services.myInfo.chooseIdentity().owningKey))
val spendTx = node.services.signInitialTransaction(txBuilder)
val notarise = node.services.startFlow(NotaryFlow.Client(spendTx))
mockNet.runNetwork()
val signature = notarise.resultFuture.get().single()
val notariseRetry = node.services.startFlow(NotaryFlow.Client(spendTx))
mockNet.runNetwork()
val signatureRetry = notariseRetry.resultFuture.get().single()
fun checkSignature(signature: TransactionSignature) {
signature.verify(spendTx.id)
assertEquals(notaryParty.owningKey, signature.by)
}
checkSignature(signature)
checkSignature(signatureRetry)
}
private fun createNotaryNode(): MockNetwork.MockNode {
val dataStoreProperties = makeTestDataSourceProperties().apply {
setProperty("autoCommit", "false")
}
return mockNet.createUnstartedNode(
MockNodeParameters(
legalName = notaryName,
entropyRoot = BigInteger.valueOf(60L),
configOverrides = {
val notaryConfig = NotaryConfig(validating = false, mysql = dataStoreProperties)
doReturn(notaryConfig).whenever(it).notary
}
)
)
}
private fun issueState(node: StartedNode<*>, notary: Party): StateAndRef<*> {
return node.database.transaction {
val builder = DummyContract.generateInitial(Random().nextInt(), notary, node.info.chooseIdentity().ref(0))
val stx = node.services.signInitialTransaction(builder)
node.services.recordTransactions(stx)
StateAndRef(builder.outputStates().first(), StateRef(stx.id, 0))
}
}
}

View File

@ -668,14 +668,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
private fun makeCoreNotaryService(notaryConfig: NotaryConfig, database: CordaPersistence): NotaryService { private fun makeCoreNotaryService(notaryConfig: NotaryConfig, database: CordaPersistence): NotaryService {
val notaryKey = myNotaryIdentity?.owningKey ?: throw IllegalArgumentException("No notary identity initialized when creating a notary service") val notaryKey = myNotaryIdentity?.owningKey ?: throw IllegalArgumentException("No notary identity initialized when creating a notary service")
return notaryConfig.run { return notaryConfig.run {
if (raft != null) { when {
raft != null -> {
val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.monitoringService.metrics, raft) val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.monitoringService.metrics, raft)
(if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider) (if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider)
} else if (bftSMaRt != null) { }
bftSMaRt != null -> {
if (validating) throw IllegalArgumentException("Validating BFTSMaRt notary not supported") if (validating) throw IllegalArgumentException("Validating BFTSMaRt notary not supported")
BFTNonValidatingNotaryService(services, notaryKey, bftSMaRt, makeBFTCluster(notaryKey, bftSMaRt)) BFTNonValidatingNotaryService(services, notaryKey, bftSMaRt, makeBFTCluster(notaryKey, bftSMaRt))
} else { }
(if (validating) ::ValidatingNotaryService else ::SimpleNotaryService)(services, notaryKey) mysql != null -> {
(if (validating) ::MySQLValidatingNotaryService else ::MySQLNonValidatingNotaryService)(services, notaryKey, mysql, configuration.devMode)
}
else -> (if (validating) ::ValidatingNotaryService else ::SimpleNotaryService)(services, notaryKey)
} }
} }
} }
@ -725,7 +730,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
Pair("identity", myLegalName) Pair("identity", myLegalName)
} else { } else {
val notaryId = notaryConfig.run { val notaryId = notaryConfig.run {
NotaryService.constructId(validating, raft != null, bftSMaRt != null, custom) NotaryService.constructId(validating, raft != null, bftSMaRt != null, custom, mysql != null)
} }
// The node is part of a distributed notary whose identity must already be generated beforehand. // The node is part of a distributed notary whose identity must already be generated beforehand.
Pair(notaryId, null) Pair(notaryId, null)

View File

@ -6,10 +6,10 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.services.messaging.CertificateChainCheckPolicy import net.corda.node.services.messaging.CertificateChainCheckPolicy
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.parseAs import net.corda.nodeapi.internal.config.parseAs
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import java.net.URL import java.net.URL
import java.nio.file.Path import java.nio.file.Path
import java.util.* import java.util.*
@ -55,11 +55,12 @@ fun NodeConfiguration.shouldCheckCheckpoints(): Boolean {
data class NotaryConfig(val validating: Boolean, data class NotaryConfig(val validating: Boolean,
val raft: RaftConfig? = null, val raft: RaftConfig? = null,
val bftSMaRt: BFTSMaRtConfiguration? = null, val bftSMaRt: BFTSMaRtConfiguration? = null,
val custom: Boolean = false val custom: Boolean = false,
val mysql: Properties? = null
) { ) {
init { init {
require(raft == null || bftSMaRt == null || !custom) { require(raft == null || bftSMaRt == null || !custom || mysql == null) {
"raft, bftSMaRt, and custom configs cannot be specified together" "raft, bftSMaRt, custom, and mysql configs cannot be specified together"
} }
} }
val isClusterConfig: Boolean get() = raft != null || bftSMaRt != null val isClusterConfig: Boolean get() = raft != null || bftSMaRt != null

View File

@ -0,0 +1,52 @@
package net.corda.node.services.transactions
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.node.services.api.ServiceHubInternal
import java.security.PublicKey
import java.util.*
/** Notary service backed by a replicated MySQL database. */
abstract class MySQLNotaryService(
final override val services: ServiceHubInternal,
override val notaryIdentityKey: PublicKey,
dataSourceProperties: Properties,
/** Database table will be automatically created in dev mode */
val devMode: Boolean) : TrustedAuthorityNotaryService() {
override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = MySQLUniquenessProvider(
services.monitoringService.metrics,
dataSourceProperties
)
override fun start() {
if (devMode) uniquenessProvider.createTable()
}
override fun stop() {
uniquenessProvider.stop()
}
}
class MySQLNonValidatingNotaryService(services: ServiceHubInternal,
notaryIdentityKey: PublicKey,
dataSourceProperties: Properties,
devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, dataSourceProperties, devMode) {
companion object {
val id = constructId(validating = false, mysql = true)
}
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = NonValidatingNotaryFlow(otherPartySession, this)
}
class MySQLValidatingNotaryService(services: ServiceHubInternal,
notaryIdentityKey: PublicKey,
dataSourceProperties: Properties,
devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, dataSourceProperties, devMode) {
companion object {
val id = constructId(validating = true, mysql = true)
}
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = ValidatingNotaryFlow(otherPartySession, this)
}

View File

@ -0,0 +1,162 @@
package net.corda.node.services.transactions
import com.codahale.metrics.MetricRegistry
import com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.services.UniquenessException
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import java.security.PublicKey
import java.sql.BatchUpdateException
import java.sql.Connection
import java.util.*
/**
* Uniqueness provider backed by a MySQL database. It is intended to be used with a multi-master synchronously replicated
* variant of MySQL, such as Percona XtraDB Cluster, or MariaDB Galera Cluster.
*
* Note that no ORM is used since we want to retain full control over table schema and be able to experiment with optimisations.
*/
class MySQLUniquenessProvider(
metrics: MetricRegistry,
dataSourceProperties: Properties
) : UniquenessProvider, SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<MySQLUniquenessProvider>()
// TODO: optimize table schema for InnoDB
private val createTableStatement =
"CREATE TABLE IF NOT EXISTS committed_states (" +
"issue_tx_id BINARY(32) NOT NULL," +
"issue_tx_output_id INT NOT NULL," +
"consuming_tx_id BINARY(32) NOT NULL," +
"consuming_tx_input_id INT UNSIGNED NOT NULL," +
"consuming_party_name TEXT NOT NULL," +
// TODO: do we need to store the key? X500 name should be sufficient
"consuming_party_key BLOB NOT NULL," +
"commit_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP," +
"CONSTRAINT id PRIMARY KEY (issue_tx_id, issue_tx_output_id)" +
")"
private val insertStatement = "INSERT INTO committed_states (issue_tx_id, issue_tx_output_id, consuming_tx_id, consuming_tx_input_id, consuming_party_name, consuming_party_key) VALUES (?, ?, ?, ?, ?, ?)"
private val findStatement = "SELECT consuming_tx_id, consuming_tx_input_id, consuming_party_name, consuming_party_key FROM committed_states WHERE issue_tx_id = ? AND issue_tx_output_id = ?"
}
private val metricPrefix = MySQLUniquenessProvider::class.simpleName
/** Transaction commit duration and rate metric timer */
private val commitTimer = metrics.timer("$metricPrefix.Commit")
/**
* When writing to multiple masters with Galera, transaction rollbacks may happen due to high write contention.
* This is a useful heath metric.
*/
private val rollbackCounter = metrics.counter("$metricPrefix.Rollback")
/** Track double spend attempts. Note that this will also include notarisation retries. */
private val conflictCounter = metrics.counter("$metricPrefix.Conflicts")
val dataSource = HikariDataSource(HikariConfig(dataSourceProperties))
private val connection: Connection
get() = dataSource.connection
fun createTable() {
log.debug("Attempting to create DB table if it does not yet exist: $createTableStatement")
connection.use {
it.createStatement().execute(createTableStatement)
it.commit()
}
}
fun stop() {
dataSource.close()
}
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
val timer = commitTimer.time()
try {
retryTransaction(CommitAll(states, txId, callerIdentity))
} catch (e: BatchUpdateException) {
log.info("Unable to commit input states, finding conflicts", e)
conflictCounter.inc()
retryTransaction(FindConflicts(states))
} finally {
timer.stop()
}
}
private fun retryTransaction(tx: RetryableTransaction) {
connection.use {
while (true) {
try {
tx.run(it)
} catch (e: Exception) {
it.rollback()
if (e is MySQLTransactionRollbackException) {
log.warn("Rollback exception occurred, retrying", e)
rollbackCounter.inc()
continue
} else {
throw e
}
}
break
}
it.commit()
}
}
interface RetryableTransaction {
fun run(conn: Connection)
}
private class CommitAll(val states: List<StateRef>, val txId: SecureHash, val callerIdentity: Party) : RetryableTransaction {
override fun run(conn: Connection) {
conn.prepareStatement(insertStatement).apply {
states.forEachIndexed { index, stateRef ->
// StateRef
setBytes(1, stateRef.txhash.bytes)
setInt(2, stateRef.index)
// Consuming transaction
setBytes(3, txId.bytes)
setInt(4, index)
setString(5, callerIdentity.name.toString())
setBytes(6, callerIdentity.owningKey.serialize().bytes)
addBatch()
clearParameters()
}
executeBatch()
close()
}
}
}
private class FindConflicts(val states: List<StateRef>) : RetryableTransaction {
override fun run(conn: Connection) {
val conflicts = mutableMapOf<StateRef, UniquenessProvider.ConsumingTx>()
states.forEach {
val st = conn.prepareStatement(findStatement).apply {
setBytes(1, it.txhash.bytes)
setInt(2, it.index)
}
val result = st.executeQuery()
if (result.next()) {
val consumingTxId = SecureHash.SHA256(result.getBytes(1))
val inputIndex = result.getInt(2)
val partyName = CordaX500Name.parse(result.getString(3))
val partyKey: PublicKey = result.getBytes(4).deserialize()
conflicts[it] = UniquenessProvider.ConsumingTx(consumingTxId, inputIndex, Party(partyName, partyKey))
}
}
conn.commit()
if (conflicts.isNotEmpty()) throw UniquenessException(UniquenessProvider.Conflict(conflicts))
}
}
}