From e57dab6fcc1a650ddeb9de84ac9d605bb1ec97b4 Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Mon, 18 Dec 2017 09:47:07 +0000 Subject: [PATCH] ENT-1232: Add MySQL notary service (#187) * Add MySQL notary service. This is the first iteration of a HA notary service, and is intended to replace the Raft notary. The backing database should be a replicated MySQL implementation such as Percona XtraDB Cluster. Handle MySQLTransactionRollbackException Add the `mysql-connector-java` to the build file of the node package. Add Hikari connection pool * Store requesting party name & key in separate columns (for consistency with existing uniqueness providers) * Add idempotency test and additional counters --- .../corda/core/node/services/NotaryService.kt | 12 +- node/build.gradle | 2 + .../node/services/MySQLNotaryServiceTests.kt | 154 +++++++++++++++++ .../net/corda/node/internal/AbstractNode.kt | 23 ++- .../node/services/config/NodeConfiguration.kt | 11 +- .../transactions/MySQLNotaryService.kt | 52 ++++++ .../transactions/MySQLUniquenessProvider.kt | 162 ++++++++++++++++++ 7 files changed, 400 insertions(+), 16 deletions(-) create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/MySQLNotaryServiceTests.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/transactions/MySQLNotaryService.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt diff --git a/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt index 46227d0027..0c928a7b59 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt @@ -16,13 +16,21 @@ import java.security.PublicKey abstract class NotaryService : SingletonSerializeAsToken() { companion object { const val ID_PREFIX = "corda.notary." - fun constructId(validating: Boolean, raft: Boolean = false, bft: Boolean = false, custom: Boolean = false): String { - require(Booleans.countTrue(raft, bft, custom) <= 1) { "At most one of raft, bft or custom may be true" } + @JvmOverloads + fun constructId( + validating: Boolean, + raft: Boolean = false, + bft: Boolean = false, + custom: Boolean = false, + mysql: Boolean = false + ): String { + require(Booleans.countTrue(raft, bft, custom, mysql) <= 1) { "At most one of raft, bft, mysql or custom may be true" } return StringBuffer(ID_PREFIX).apply { append(if (validating) "validating" else "simple") if (raft) append(".raft") if (bft) append(".bft") if (custom) append(".custom") + if (mysql) append(".mysql") }.toString() } } diff --git a/node/build.gradle b/node/build.gradle index f6cbaf4eb6..361c2b3333 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -127,6 +127,8 @@ dependencies { compile "org.postgresql:postgresql:$postgresql_version" //For Azure SQL and SQL Server support in persistence compile 'com.microsoft.sqlserver:mssql-jdbc:6.2.1.jre8' + // For the MySQLUniquenessProvider + compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6' // SQL connection pooling library compile "com.zaxxer:HikariCP:2.5.1" diff --git a/node/src/integration-test/kotlin/net/corda/node/services/MySQLNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/MySQLNotaryServiceTests.kt new file mode 100644 index 0000000000..f9e1f16b61 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/MySQLNotaryServiceTests.kt @@ -0,0 +1,154 @@ +package net.corda.node.services + +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.TransactionSignature +import net.corda.core.flows.NotaryError +import net.corda.core.flows.NotaryException +import net.corda.core.flows.NotaryFlow +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.getOrThrow +import net.corda.node.internal.StartedNode +import net.corda.node.services.config.NotaryConfig +import net.corda.nodeapi.internal.NetworkParametersCopier +import net.corda.nodeapi.internal.NotaryInfo +import net.corda.nodeapi.internal.ServiceIdentityGenerator +import net.corda.testing.* +import net.corda.testing.common.internal.testNetworkParameters +import net.corda.testing.contracts.DummyContract +import net.corda.testing.node.MockNetwork +import net.corda.testing.node.MockNodeParameters +import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import net.corda.testing.node.startFlow +import org.junit.After +import org.junit.Before +import org.junit.ClassRule +import org.junit.Test +import java.math.BigInteger +import java.util.* +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class MySQLNotaryServiceTests : IntegrationTest() { + companion object { + val notaryName = CordaX500Name("MySQL Notary Service", "Zurich", "CH") + @ClassRule + @JvmField + val databaseSchemas = IntegrationTestSchemas("node_0", DUMMY_NOTARY_NAME.toDatabaseSchemaName()) + } + + private lateinit var mockNet: MockNetwork + private lateinit var node: StartedNode + private lateinit var notaryParty: Party + private lateinit var notaryNode: StartedNode + + @Before + fun before() { + mockNet = MockNetwork(cordappPackages = listOf("net.corda.testing.contracts")) + notaryParty = ServiceIdentityGenerator.generateToDisk( + listOf(mockNet.baseDirectory(mockNet.nextNodeId)), + notaryName, + "identity" + ) + val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryParty, false)))) + val notaryNodeUnstarted = createNotaryNode() + val nodeUnstarted = mockNet.createUnstartedNode() + + val startedNodes = listOf(notaryNodeUnstarted, nodeUnstarted).map { n -> + networkParameters.install(mockNet.baseDirectory(n.id)) + n.start() + } + notaryNode = startedNodes.first() + node = startedNodes.last() + } + + @After + fun stopNodes() { + mockNet.stopNodes() + } + + @Test + fun `detect double spend`() { + val inputState = issueState(node, notaryParty) + + val firstTxBuilder = TransactionBuilder(notaryParty) + .addInputState(inputState) + .addCommand(dummyCommand(node.services.myInfo.chooseIdentity().owningKey)) + val firstSpendTx = node.services.signInitialTransaction(firstTxBuilder) + + val firstSpend = node.services.startFlow(NotaryFlow.Client(firstSpendTx)) + mockNet.runNetwork() + + firstSpend.resultFuture.getOrThrow() + + val secondSpendBuilder = TransactionBuilder(notaryParty).withItems(inputState).run { + val dummyState = DummyContract.SingleOwnerState(0, node.info.chooseIdentity()) + addOutputState(dummyState, DummyContract.PROGRAM_ID) + addCommand(dummyCommand(node.services.myInfo.chooseIdentity().owningKey)) + this + } + val secondSpendTx = node.services.signInitialTransaction(secondSpendBuilder) + val secondSpend = node.services.startFlow(NotaryFlow.Client(secondSpendTx)) + + mockNet.runNetwork() + + val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() } + val error = ex.error as NotaryError.Conflict + assertEquals(error.txId, secondSpendTx.id) + } + + @Test + fun `notarisations are idempotent`() { + val inputState = issueState(node, notaryParty) + + val txBuilder = TransactionBuilder(notaryParty) + .addInputState(inputState) + .addCommand(dummyCommand(node.services.myInfo.chooseIdentity().owningKey)) + val spendTx = node.services.signInitialTransaction(txBuilder) + + val notarise = node.services.startFlow(NotaryFlow.Client(spendTx)) + mockNet.runNetwork() + val signature = notarise.resultFuture.get().single() + + val notariseRetry = node.services.startFlow(NotaryFlow.Client(spendTx)) + mockNet.runNetwork() + val signatureRetry = notariseRetry.resultFuture.get().single() + + fun checkSignature(signature: TransactionSignature) { + signature.verify(spendTx.id) + assertEquals(notaryParty.owningKey, signature.by) + } + + checkSignature(signature) + checkSignature(signatureRetry) + } + + private fun createNotaryNode(): MockNetwork.MockNode { + val dataStoreProperties = makeTestDataSourceProperties().apply { + setProperty("autoCommit", "false") + } + return mockNet.createUnstartedNode( + MockNodeParameters( + legalName = notaryName, + entropyRoot = BigInteger.valueOf(60L), + configOverrides = { + val notaryConfig = NotaryConfig(validating = false, mysql = dataStoreProperties) + doReturn(notaryConfig).whenever(it).notary + } + ) + ) + } + + private fun issueState(node: StartedNode<*>, notary: Party): StateAndRef<*> { + return node.database.transaction { + val builder = DummyContract.generateInitial(Random().nextInt(), notary, node.info.chooseIdentity().ref(0)) + val stx = node.services.signInitialTransaction(builder) + node.services.recordTransactions(stx) + StateAndRef(builder.outputStates().first(), StateRef(stx.id, 0)) + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 790ccfbf2d..89c23d2af9 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -668,14 +668,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration, private fun makeCoreNotaryService(notaryConfig: NotaryConfig, database: CordaPersistence): NotaryService { val notaryKey = myNotaryIdentity?.owningKey ?: throw IllegalArgumentException("No notary identity initialized when creating a notary service") return notaryConfig.run { - if (raft != null) { - val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.monitoringService.metrics, raft) - (if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider) - } else if (bftSMaRt != null) { - if (validating) throw IllegalArgumentException("Validating BFTSMaRt notary not supported") - BFTNonValidatingNotaryService(services, notaryKey, bftSMaRt, makeBFTCluster(notaryKey, bftSMaRt)) - } else { - (if (validating) ::ValidatingNotaryService else ::SimpleNotaryService)(services, notaryKey) + when { + raft != null -> { + val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.monitoringService.metrics, raft) + (if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider) + } + bftSMaRt != null -> { + if (validating) throw IllegalArgumentException("Validating BFTSMaRt notary not supported") + BFTNonValidatingNotaryService(services, notaryKey, bftSMaRt, makeBFTCluster(notaryKey, bftSMaRt)) + } + mysql != null -> { + (if (validating) ::MySQLValidatingNotaryService else ::MySQLNonValidatingNotaryService)(services, notaryKey, mysql, configuration.devMode) + } + else -> (if (validating) ::ValidatingNotaryService else ::SimpleNotaryService)(services, notaryKey) } } } @@ -725,7 +730,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, Pair("identity", myLegalName) } else { val notaryId = notaryConfig.run { - NotaryService.constructId(validating, raft != null, bftSMaRt != null, custom) + NotaryService.constructId(validating, raft != null, bftSMaRt != null, custom, mysql != null) } // The node is part of a distributed notary whose identity must already be generated beforehand. Pair(notaryId, null) diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index c08b81efce..82518d14de 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -6,10 +6,10 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.seconds import net.corda.node.services.messaging.CertificateChainCheckPolicy -import net.corda.nodeapi.internal.persistence.DatabaseConfig -import net.corda.nodeapi.internal.config.User import net.corda.nodeapi.internal.config.NodeSSLConfiguration +import net.corda.nodeapi.internal.config.User import net.corda.nodeapi.internal.config.parseAs +import net.corda.nodeapi.internal.persistence.DatabaseConfig import java.net.URL import java.nio.file.Path import java.util.* @@ -55,11 +55,12 @@ fun NodeConfiguration.shouldCheckCheckpoints(): Boolean { data class NotaryConfig(val validating: Boolean, val raft: RaftConfig? = null, val bftSMaRt: BFTSMaRtConfiguration? = null, - val custom: Boolean = false + val custom: Boolean = false, + val mysql: Properties? = null ) { init { - require(raft == null || bftSMaRt == null || !custom) { - "raft, bftSMaRt, and custom configs cannot be specified together" + require(raft == null || bftSMaRt == null || !custom || mysql == null) { + "raft, bftSMaRt, custom, and mysql configs cannot be specified together" } } val isClusterConfig: Boolean get() = raft != null || bftSMaRt != null diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/MySQLNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLNotaryService.kt new file mode 100644 index 0000000000..0bf810b2ab --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLNotaryService.kt @@ -0,0 +1,52 @@ +package net.corda.node.services.transactions + +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.node.services.TimeWindowChecker +import net.corda.core.node.services.TrustedAuthorityNotaryService +import net.corda.node.services.api.ServiceHubInternal +import java.security.PublicKey +import java.util.* + +/** Notary service backed by a replicated MySQL database. */ +abstract class MySQLNotaryService( + final override val services: ServiceHubInternal, + override val notaryIdentityKey: PublicKey, + dataSourceProperties: Properties, + /** Database table will be automatically created in dev mode */ + val devMode: Boolean) : TrustedAuthorityNotaryService() { + + override val timeWindowChecker = TimeWindowChecker(services.clock) + override val uniquenessProvider = MySQLUniquenessProvider( + services.monitoringService.metrics, + dataSourceProperties + ) + + override fun start() { + if (devMode) uniquenessProvider.createTable() + } + + override fun stop() { + uniquenessProvider.stop() + } +} + +class MySQLNonValidatingNotaryService(services: ServiceHubInternal, + notaryIdentityKey: PublicKey, + dataSourceProperties: Properties, + devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, dataSourceProperties, devMode) { + companion object { + val id = constructId(validating = false, mysql = true) + } + override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = NonValidatingNotaryFlow(otherPartySession, this) +} + +class MySQLValidatingNotaryService(services: ServiceHubInternal, + notaryIdentityKey: PublicKey, + dataSourceProperties: Properties, + devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, dataSourceProperties, devMode) { + companion object { + val id = constructId(validating = true, mysql = true) + } + override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = ValidatingNotaryFlow(otherPartySession, this) +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt new file mode 100644 index 0000000000..33fe84d79c --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt @@ -0,0 +1,162 @@ +package net.corda.node.services.transactions + +import com.codahale.metrics.MetricRegistry +import com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.node.services.UniquenessException +import net.corda.core.node.services.UniquenessProvider +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.utilities.loggerFor +import java.security.PublicKey +import java.sql.BatchUpdateException +import java.sql.Connection +import java.util.* + +/** + * Uniqueness provider backed by a MySQL database. It is intended to be used with a multi-master synchronously replicated + * variant of MySQL, such as Percona XtraDB Cluster, or MariaDB Galera Cluster. + * + * Note that no ORM is used since we want to retain full control over table schema and be able to experiment with optimisations. + */ +class MySQLUniquenessProvider( + metrics: MetricRegistry, + dataSourceProperties: Properties +) : UniquenessProvider, SingletonSerializeAsToken() { + companion object { + private val log = loggerFor() + + // TODO: optimize table schema for InnoDB + private val createTableStatement = + "CREATE TABLE IF NOT EXISTS committed_states (" + + "issue_tx_id BINARY(32) NOT NULL," + + "issue_tx_output_id INT NOT NULL," + + "consuming_tx_id BINARY(32) NOT NULL," + + "consuming_tx_input_id INT UNSIGNED NOT NULL," + + "consuming_party_name TEXT NOT NULL," + + // TODO: do we need to store the key? X500 name should be sufficient + "consuming_party_key BLOB NOT NULL," + + "commit_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP," + + "CONSTRAINT id PRIMARY KEY (issue_tx_id, issue_tx_output_id)" + + ")" + private val insertStatement = "INSERT INTO committed_states (issue_tx_id, issue_tx_output_id, consuming_tx_id, consuming_tx_input_id, consuming_party_name, consuming_party_key) VALUES (?, ?, ?, ?, ?, ?)" + private val findStatement = "SELECT consuming_tx_id, consuming_tx_input_id, consuming_party_name, consuming_party_key FROM committed_states WHERE issue_tx_id = ? AND issue_tx_output_id = ?" + } + + private val metricPrefix = MySQLUniquenessProvider::class.simpleName + /** Transaction commit duration and rate metric timer */ + private val commitTimer = metrics.timer("$metricPrefix.Commit") + /** + * When writing to multiple masters with Galera, transaction rollbacks may happen due to high write contention. + * This is a useful heath metric. + */ + private val rollbackCounter = metrics.counter("$metricPrefix.Rollback") + /** Track double spend attempts. Note that this will also include notarisation retries. */ + private val conflictCounter = metrics.counter("$metricPrefix.Conflicts") + + val dataSource = HikariDataSource(HikariConfig(dataSourceProperties)) + + private val connection: Connection + get() = dataSource.connection + + fun createTable() { + log.debug("Attempting to create DB table if it does not yet exist: $createTableStatement") + connection.use { + it.createStatement().execute(createTableStatement) + it.commit() + } + } + + fun stop() { + dataSource.close() + } + + override fun commit(states: List, txId: SecureHash, callerIdentity: Party) { + val timer = commitTimer.time() + try { + retryTransaction(CommitAll(states, txId, callerIdentity)) + } catch (e: BatchUpdateException) { + log.info("Unable to commit input states, finding conflicts", e) + conflictCounter.inc() + retryTransaction(FindConflicts(states)) + } finally { + timer.stop() + } + } + + private fun retryTransaction(tx: RetryableTransaction) { + connection.use { + while (true) { + try { + tx.run(it) + } catch (e: Exception) { + it.rollback() + if (e is MySQLTransactionRollbackException) { + log.warn("Rollback exception occurred, retrying", e) + rollbackCounter.inc() + continue + } else { + throw e + } + } + break + } + it.commit() + } + } + + interface RetryableTransaction { + fun run(conn: Connection) + } + + private class CommitAll(val states: List, val txId: SecureHash, val callerIdentity: Party) : RetryableTransaction { + override fun run(conn: Connection) { + conn.prepareStatement(insertStatement).apply { + states.forEachIndexed { index, stateRef -> + // StateRef + setBytes(1, stateRef.txhash.bytes) + setInt(2, stateRef.index) + // Consuming transaction + setBytes(3, txId.bytes) + setInt(4, index) + setString(5, callerIdentity.name.toString()) + setBytes(6, callerIdentity.owningKey.serialize().bytes) + + addBatch() + clearParameters() + } + executeBatch() + close() + } + } + } + + private class FindConflicts(val states: List) : RetryableTransaction { + override fun run(conn: Connection) { + val conflicts = mutableMapOf() + states.forEach { + val st = conn.prepareStatement(findStatement).apply { + setBytes(1, it.txhash.bytes) + setInt(2, it.index) + } + val result = st.executeQuery() + + if (result.next()) { + val consumingTxId = SecureHash.SHA256(result.getBytes(1)) + val inputIndex = result.getInt(2) + val partyName = CordaX500Name.parse(result.getString(3)) + val partyKey: PublicKey = result.getBytes(4).deserialize() + conflicts[it] = UniquenessProvider.ConsumingTx(consumingTxId, inputIndex, Party(partyName, partyKey)) + } + } + conn.commit() + if (conflicts.isNotEmpty()) throw UniquenessException(UniquenessProvider.Conflict(conflicts)) + } + } +} \ No newline at end of file