CORDA-535: Move MySQL & JPA notary configurations out of node

This commit is contained in:
Andrius Dagys 2018-10-22 15:45:40 +01:00
parent 75c9c50abe
commit 29443bf7fe
9 changed files with 90 additions and 33 deletions

View File

@ -0,0 +1,9 @@
package net.corda.notary.jpa
data class JPANotaryConfiguration(
val batchSize: Int = 128,
val batchTimeoutMs: Long = 1L,
val maxInputStates: Int = 2000,
val maxDBTransactionRetryCount: Int = 10,
val backOffBaseMs: Long = 20L
)

View File

@ -6,6 +6,7 @@ import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.node.services.transactions.ValidatingNotaryFlow
import net.corda.nodeapi.internal.config.parseAs
import java.security.PublicKey
/** Notary service backed by a replicated MySQL database. */
@ -17,7 +18,12 @@ class JPANotaryService(
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
override val asyncUniquenessProvider = with(services) {
JPAUniquenessProvider(services.clock, services.database, notaryConfig)
val jpaNotaryConfig = try {
notaryConfig.extraConfig!!.parseAs<JPANotaryConfiguration>()
} catch (e: Exception) {
throw IllegalArgumentException("Failed to register ${JPANotaryService::class.java}: JPA notary configuration not present")
}
JPAUniquenessProvider(services.clock, services.database, jpaNotaryConfig)
}
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow {

View File

@ -12,14 +12,16 @@ import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.*
import net.corda.core.internal.notary.AsyncUniquenessProvider
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.isConsumedByTheSameTx
import net.corda.core.internal.notary.validateTimeWindow
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.serialization.internal.CordaSerializationEncoding
@ -36,7 +38,7 @@ import kotlin.concurrent.thread
/** A JPA backed Uniqueness provider */
@ThreadSafe
class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val notaryConfig: NotaryConfig) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val config: JPANotaryConfiguration) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
// TODO: test vs. MySQLUniquenessProvider
@ -49,7 +51,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
@CordaSerializable
class Request(
@Id
@Column(nullable = true, length=76)
@Column(nullable = true, length = 76)
var id: String? = null,
@Column(name = "consuming_transaction_id", nullable = true, length = 64)
@ -79,7 +81,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}jpa_notary_committed_states")
@NamedQuery(name = "CommittedState.select", query="SELECT c from JPAUniquenessProvider\$CommittedState c WHERE c.id in :ids")
@NamedQuery(name = "CommittedState.select", query = "SELECT c from JPAUniquenessProvider\$CommittedState c WHERE c.id in :ids")
class CommittedState(
@Id
@Column(name = "state_ref", length = 73)
@ -96,7 +98,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
try {
val buffer = LinkedList<CommitRequest>()
while (!Thread.interrupted()) {
val drainedSize = Queues.drain(requestQueue, buffer, notaryConfig.batchSize, notaryConfig.batchTimeoutMs, TimeUnit.MILLISECONDS)
val drainedSize = Queues.drain(requestQueue, buffer, config.batchSize, config.batchTimeoutMs, TimeUnit.MILLISECONDS)
if (drainedSize == 0) continue
processRequests(buffer)
buffer.clear()
@ -115,7 +117,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
private const val jdbcBatchSize = 100_000
private val log = contextLogger()
fun encodeStateRef(s: StateRef): String {
fun encodeStateRef(s: StateRef): String {
return s.txhash.toString() + ":" + s.index.toString(16)
}
@ -173,7 +175,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
val ids = (states + references).map { encodeStateRef(it) }.toSet()
val committedStates = mutableListOf<CommittedState>()
for (idsBatch in ids.chunked(notaryConfig.maxInputStates)) {
for (idsBatch in ids.chunked(config.maxInputStates)) {
@SuppressWarnings("unchecked")
val existing = session.createNamedQuery("CommittedState.select").setParameter("ids", idsBatch).resultList as List<CommittedState>
committedStates.addAll(existing)
@ -184,7 +186,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
val consumingTxId = SecureHash.parse(it.consumingTxHash)
if (stateRef in references) {
stateRef to StateConsumptionDetails(hashOfTransactionId = consumingTxId, type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
} else {
} else {
stateRef to StateConsumptionDetails(consumingTxId.sha256())
}
}.toMap()
@ -192,8 +194,8 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
private fun withRetry(block: () -> Unit) {
var retryCount = 0
var backOff = notaryConfig.backOffBaseMs
while (retryCount < notaryConfig.maxDBTransactionRetryCount) {
var backOff = config.backOffBaseMs
while (retryCount < config.maxDBTransactionRetryCount) {
try {
block()
break
@ -263,7 +265,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
}
}
}
} catch(e: Exception) {
} catch (e: Exception) {
log.warn("Error processing commit requests", e)
for (request in requests) {
respondWithError(request, e)
@ -272,10 +274,10 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
}
private fun respondWithError(request: CommitRequest, exception: Exception) {
if (exception is NotaryInternalException) {
request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error))
} else {
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
}
if (exception is NotaryInternalException) {
request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error))
} else {
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
}
}
}

View File

@ -10,10 +10,11 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.schema.NodeSchemaService
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.decodeStateRef
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.encodeStateRef
import net.corda.nodeapi.internal.config.toConfig
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.decodeStateRef
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.encodeStateRef
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.generateStateRef
@ -35,7 +36,8 @@ class JPAUniquenessProviderTests {
private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party
private val txID = SecureHash.randomSHA256()
private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0)
private val notaryConfig = NotaryConfig(validating=false, maxInputStates = 10)
private val notaryConfig = JPANotaryConfiguration(maxInputStates = 10)
private lateinit var database: CordaPersistence
@ -53,10 +55,10 @@ class JPAUniquenessProviderTests {
@Test
fun `should commit a transaction with unused inputs without exception`() {
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
val inputState = generateStateRef()
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
val inputState = generateStateRef()
provider.commit(listOf(inputState), txID, identity, requestSignature)
provider.commit(listOf(inputState), txID, identity, requestSignature)
}
@Test
@ -86,7 +88,7 @@ class JPAUniquenessProviderTests {
@Test
fun `all conflicts are found with batching`() {
val nrStates = notaryConfig.maxInputStates + notaryConfig.maxInputStates/2
val nrStates = notaryConfig.maxInputStates + notaryConfig.maxInputStates / 2
val stateRefs = (1..nrStates).map { generateStateRef() }
println(stateRefs.size)
val firstTxId = SecureHash.randomSHA256()

View File

@ -0,0 +1,34 @@
package net.corda.notary.mysql
import java.util.*
data class MySQLNotaryConfiguration(
val dataSource: Properties,
/**
* Number of times to attempt to reconnect to the database.
*/
val connectionRetries: Int = 2, // Default value for a 3 server cluster.
/**
* Time increment between re-connection attempts.
*
* The total back-off duration is calculated as: backOffIncrement * backOffBase ^ currentRetryCount
*/
val backOffIncrement: Int = 500,
/** Exponential back-off multiplier base. */
val backOffBase: Double = 1.5,
/** The maximum number of transactions processed in a single batch. */
val maxBatchSize: Int = 500,
/** The maximum combined number of input states processed in a single batch. */
val maxBatchInputStates: Int = 10_000,
/** A batch will be processed after a specified timeout even if it has not yet reached full capacity. */
val batchTimeoutMs: Long = 200,
/**
* The maximum number of commit requests in flight. Once the capacity is reached the service will block on
* further commit requests.
*/
val maxQueueSize: Int = 100_000
) {
init {
require(connectionRetries >= 0) { "connectionRetries cannot be negative" }
}
}

View File

@ -6,6 +6,7 @@ import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.node.services.transactions.ValidatingNotaryFlow
import net.corda.nodeapi.internal.config.parseAs
import java.security.PublicKey
/** Notary service backed by a replicated MySQL database. */
@ -20,8 +21,11 @@ class MySQLNotaryService(
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
override val asyncUniquenessProvider = with(services) {
val mysqlConfig = notaryConfig.mysql
?: throw IllegalArgumentException("Failed to register ${this::class.java}: raft configuration not present")
val mysqlConfig = try {
notaryConfig.extraConfig!!.parseAs<MySQLNotaryConfiguration>()
} catch (e: Exception) {
throw IllegalArgumentException("Failed to register ${MySQLNotaryService::class.java}: mysql configuration not present")
}
MySQLUniquenessProvider(
services.monitoringService.metrics,
services.clock,

View File

@ -30,7 +30,6 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.services.config.MySQLConfiguration
import net.corda.serialization.internal.CordaSerializationEncoding.SNAPPY
import java.sql.*
import java.time.Clock
@ -49,7 +48,7 @@ import kotlin.concurrent.thread
class MySQLUniquenessProvider(
metrics: MetricRegistry,
val clock: Clock,
val config: MySQLConfiguration
val config: MySQLNotaryConfiguration
) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<MySQLUniquenessProvider>()

View File

@ -22,9 +22,9 @@ import net.corda.core.node.NotaryInfo
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.config.MySQLConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.config.toConfig
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.contracts.DummyContract
@ -256,7 +256,8 @@ class MySQLNotaryServiceTests : IntegrationTest() {
configOverrides = {
val notaryConfig = NotaryConfig(
validating = true,
mysql = MySQLConfiguration(dataStoreProperties, maxBatchSize = 10, maxBatchInputStates = 100),
extraConfig = MySQLNotaryConfiguration(dataStoreProperties, maxBatchSize = 10, maxBatchInputStates = 100).toConfig(),
serviceLegalName = notaryName,
className = MySQLNotaryService::class.java.name
)
doReturn(notaryConfig).whenever(it).notary

View File

@ -12,9 +12,9 @@ import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.MySQLConfiguration
import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.nodeapi.internal.config.parseAs
import net.corda.notary.mysql.MySQLNotaryConfiguration
import net.corda.notary.mysql.MySQLUniquenessProvider
import net.corda.notarytest.flows.AsyncLoadTestFlow
import java.net.InetAddress
@ -61,7 +61,7 @@ class JDBCNotaryService(override val services: AppServiceHub, override val notar
}
private fun createUniquenessProvider(): MySQLUniquenessProvider {
val mysqlConfig = appConfig.getConfig("mysql").parseAs<MySQLConfiguration>()
val mysqlConfig = appConfig.getConfig("mysql").parseAs<MySQLNotaryConfiguration>()
return MySQLUniquenessProvider(createMetricsRegistry(), services.clock, mysqlConfig)
}
}