diff --git a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryConfiguration.kt b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryConfiguration.kt new file mode 100644 index 0000000000..2ad082dfdf --- /dev/null +++ b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryConfiguration.kt @@ -0,0 +1,9 @@ +package net.corda.notary.jpa + +data class JPANotaryConfiguration( + val batchSize: Int = 128, + val batchTimeoutMs: Long = 1L, + val maxInputStates: Int = 2000, + val maxDBTransactionRetryCount: Int = 10, + val backOffBaseMs: Long = 20L +) \ No newline at end of file diff --git a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt index a1051bdcde..549b6f81a6 100644 --- a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt +++ b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt @@ -6,6 +6,7 @@ import net.corda.core.internal.notary.NotaryServiceFlow import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.node.services.transactions.ValidatingNotaryFlow +import net.corda.nodeapi.internal.config.parseAs import java.security.PublicKey /** Notary service backed by a replicated MySQL database. */ @@ -17,7 +18,12 @@ class JPANotaryService( ?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") override val asyncUniquenessProvider = with(services) { - JPAUniquenessProvider(services.clock, services.database, notaryConfig) + val jpaNotaryConfig = try { + notaryConfig.extraConfig!!.parseAs() + } catch (e: Exception) { + throw IllegalArgumentException("Failed to register ${JPANotaryService::class.java}: JPA notary configuration not present") + } + JPAUniquenessProvider(services.clock, services.database, jpaNotaryConfig) } override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow { diff --git a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt index ac3d3cd2a4..d4a3104bd5 100644 --- a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt +++ b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt @@ -12,14 +12,16 @@ import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture -import net.corda.core.internal.notary.* +import net.corda.core.internal.notary.AsyncUniquenessProvider +import net.corda.core.internal.notary.NotaryInternalException +import net.corda.core.internal.notary.isConsumedByTheSameTx +import net.corda.core.internal.notary.validateTimeWindow import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug -import net.corda.node.services.config.NotaryConfig import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.serialization.internal.CordaSerializationEncoding @@ -36,7 +38,7 @@ import kotlin.concurrent.thread /** A JPA backed Uniqueness provider */ @ThreadSafe -class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val notaryConfig: NotaryConfig) : AsyncUniquenessProvider, SingletonSerializeAsToken() { +class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val config: JPANotaryConfiguration) : AsyncUniquenessProvider, SingletonSerializeAsToken() { // TODO: test vs. MySQLUniquenessProvider @@ -49,7 +51,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va @CordaSerializable class Request( @Id - @Column(nullable = true, length=76) + @Column(nullable = true, length = 76) var id: String? = null, @Column(name = "consuming_transaction_id", nullable = true, length = 64) @@ -79,7 +81,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}jpa_notary_committed_states") - @NamedQuery(name = "CommittedState.select", query="SELECT c from JPAUniquenessProvider\$CommittedState c WHERE c.id in :ids") + @NamedQuery(name = "CommittedState.select", query = "SELECT c from JPAUniquenessProvider\$CommittedState c WHERE c.id in :ids") class CommittedState( @Id @Column(name = "state_ref", length = 73) @@ -96,7 +98,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va try { val buffer = LinkedList() while (!Thread.interrupted()) { - val drainedSize = Queues.drain(requestQueue, buffer, notaryConfig.batchSize, notaryConfig.batchTimeoutMs, TimeUnit.MILLISECONDS) + val drainedSize = Queues.drain(requestQueue, buffer, config.batchSize, config.batchTimeoutMs, TimeUnit.MILLISECONDS) if (drainedSize == 0) continue processRequests(buffer) buffer.clear() @@ -115,7 +117,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va private const val jdbcBatchSize = 100_000 private val log = contextLogger() - fun encodeStateRef(s: StateRef): String { + fun encodeStateRef(s: StateRef): String { return s.txhash.toString() + ":" + s.index.toString(16) } @@ -173,7 +175,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va val ids = (states + references).map { encodeStateRef(it) }.toSet() val committedStates = mutableListOf() - for (idsBatch in ids.chunked(notaryConfig.maxInputStates)) { + for (idsBatch in ids.chunked(config.maxInputStates)) { @SuppressWarnings("unchecked") val existing = session.createNamedQuery("CommittedState.select").setParameter("ids", idsBatch).resultList as List committedStates.addAll(existing) @@ -184,7 +186,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va val consumingTxId = SecureHash.parse(it.consumingTxHash) if (stateRef in references) { stateRef to StateConsumptionDetails(hashOfTransactionId = consumingTxId, type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE) - } else { + } else { stateRef to StateConsumptionDetails(consumingTxId.sha256()) } }.toMap() @@ -192,8 +194,8 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va private fun withRetry(block: () -> Unit) { var retryCount = 0 - var backOff = notaryConfig.backOffBaseMs - while (retryCount < notaryConfig.maxDBTransactionRetryCount) { + var backOff = config.backOffBaseMs + while (retryCount < config.maxDBTransactionRetryCount) { try { block() break @@ -263,7 +265,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va } } } - } catch(e: Exception) { + } catch (e: Exception) { log.warn("Error processing commit requests", e) for (request in requests) { respondWithError(request, e) @@ -272,10 +274,10 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va } private fun respondWithError(request: CommitRequest, exception: Exception) { - if (exception is NotaryInternalException) { - request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error)) - } else { - request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error.")))) - } + if (exception is NotaryInternalException) { + request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error)) + } else { + request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error.")))) + } } } diff --git a/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt b/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt index 4fa10a59b1..6e4255482e 100644 --- a/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt +++ b/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt @@ -10,10 +10,11 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.internal.notary.NotaryInternalException import net.corda.node.services.config.NotaryConfig import net.corda.node.services.schema.NodeSchemaService -import net.corda.notary.jpa.JPAUniquenessProvider.Companion.decodeStateRef -import net.corda.notary.jpa.JPAUniquenessProvider.Companion.encodeStateRef +import net.corda.nodeapi.internal.config.toConfig import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.notary.jpa.JPAUniquenessProvider.Companion.decodeStateRef +import net.corda.notary.jpa.JPAUniquenessProvider.Companion.encodeStateRef import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.TestIdentity import net.corda.testing.core.generateStateRef @@ -35,7 +36,8 @@ class JPAUniquenessProviderTests { private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party private val txID = SecureHash.randomSHA256() private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0) - private val notaryConfig = NotaryConfig(validating=false, maxInputStates = 10) + private val notaryConfig = JPANotaryConfiguration(maxInputStates = 10) + private lateinit var database: CordaPersistence @@ -53,10 +55,10 @@ class JPAUniquenessProviderTests { @Test fun `should commit a transaction with unused inputs without exception`() { - val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig) - val inputState = generateStateRef() + val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig) + val inputState = generateStateRef() - provider.commit(listOf(inputState), txID, identity, requestSignature) + provider.commit(listOf(inputState), txID, identity, requestSignature) } @Test @@ -86,7 +88,7 @@ class JPAUniquenessProviderTests { @Test fun `all conflicts are found with batching`() { - val nrStates = notaryConfig.maxInputStates + notaryConfig.maxInputStates/2 + val nrStates = notaryConfig.maxInputStates + notaryConfig.maxInputStates / 2 val stateRefs = (1..nrStates).map { generateStateRef() } println(stateRefs.size) val firstTxId = SecureHash.randomSHA256() diff --git a/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLNotaryConfiguration.kt b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLNotaryConfiguration.kt new file mode 100644 index 0000000000..3bf0e42c8c --- /dev/null +++ b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLNotaryConfiguration.kt @@ -0,0 +1,34 @@ +package net.corda.notary.mysql + +import java.util.* + +data class MySQLNotaryConfiguration( + val dataSource: Properties, + /** + * Number of times to attempt to reconnect to the database. + */ + val connectionRetries: Int = 2, // Default value for a 3 server cluster. + /** + * Time increment between re-connection attempts. + * + * The total back-off duration is calculated as: backOffIncrement * backOffBase ^ currentRetryCount + */ + val backOffIncrement: Int = 500, + /** Exponential back-off multiplier base. */ + val backOffBase: Double = 1.5, + /** The maximum number of transactions processed in a single batch. */ + val maxBatchSize: Int = 500, + /** The maximum combined number of input states processed in a single batch. */ + val maxBatchInputStates: Int = 10_000, + /** A batch will be processed after a specified timeout even if it has not yet reached full capacity. */ + val batchTimeoutMs: Long = 200, + /** + * The maximum number of commit requests in flight. Once the capacity is reached the service will block on + * further commit requests. + */ + val maxQueueSize: Int = 100_000 +) { + init { + require(connectionRetries >= 0) { "connectionRetries cannot be negative" } + } +} diff --git a/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLNotaryService.kt b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLNotaryService.kt index 765f79e3bd..65d7f7a731 100644 --- a/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLNotaryService.kt +++ b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLNotaryService.kt @@ -6,6 +6,7 @@ import net.corda.core.internal.notary.NotaryServiceFlow import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.node.services.transactions.ValidatingNotaryFlow +import net.corda.nodeapi.internal.config.parseAs import java.security.PublicKey /** Notary service backed by a replicated MySQL database. */ @@ -20,8 +21,11 @@ class MySQLNotaryService( ?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") override val asyncUniquenessProvider = with(services) { - val mysqlConfig = notaryConfig.mysql - ?: throw IllegalArgumentException("Failed to register ${this::class.java}: raft configuration not present") + val mysqlConfig = try { + notaryConfig.extraConfig!!.parseAs() + } catch (e: Exception) { + throw IllegalArgumentException("Failed to register ${MySQLNotaryService::class.java}: mysql configuration not present") + } MySQLUniquenessProvider( services.monitoringService.metrics, services.clock, diff --git a/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt index 537d8d276a..8bc01fd3f6 100644 --- a/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt +++ b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt @@ -30,7 +30,6 @@ import net.corda.core.serialization.serialize import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace -import net.corda.node.services.config.MySQLConfiguration import net.corda.serialization.internal.CordaSerializationEncoding.SNAPPY import java.sql.* import java.time.Clock @@ -49,7 +48,7 @@ import kotlin.concurrent.thread class MySQLUniquenessProvider( metrics: MetricRegistry, val clock: Clock, - val config: MySQLConfiguration + val config: MySQLNotaryConfiguration ) : AsyncUniquenessProvider, SingletonSerializeAsToken() { companion object { private val log = loggerFor() diff --git a/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt b/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt index 7a05f3e2d8..ee394f92d4 100644 --- a/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt +++ b/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt @@ -22,9 +22,9 @@ import net.corda.core.node.NotaryInfo import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds -import net.corda.node.services.config.MySQLConfiguration import net.corda.node.services.config.NotaryConfig import net.corda.nodeapi.internal.DevIdentityGenerator +import net.corda.nodeapi.internal.config.toConfig import net.corda.nodeapi.internal.network.NetworkParametersCopier import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.contracts.DummyContract @@ -256,7 +256,8 @@ class MySQLNotaryServiceTests : IntegrationTest() { configOverrides = { val notaryConfig = NotaryConfig( validating = true, - mysql = MySQLConfiguration(dataStoreProperties, maxBatchSize = 10, maxBatchInputStates = 100), + extraConfig = MySQLNotaryConfiguration(dataStoreProperties, maxBatchSize = 10, maxBatchInputStates = 100).toConfig(), + serviceLegalName = notaryName, className = MySQLNotaryService::class.java.name ) doReturn(notaryConfig).whenever(it).notary diff --git a/tools/notarytest/src/main/kotlin/net/corda/notarytest/service/JDBCNotaryService.kt b/tools/notarytest/src/main/kotlin/net/corda/notarytest/service/JDBCNotaryService.kt index 6567514cae..78ec3ce6f7 100644 --- a/tools/notarytest/src/main/kotlin/net/corda/notarytest/service/JDBCNotaryService.kt +++ b/tools/notarytest/src/main/kotlin/net/corda/notarytest/service/JDBCNotaryService.kt @@ -12,9 +12,9 @@ import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.services.config.ConfigHelper -import net.corda.node.services.config.MySQLConfiguration import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.nodeapi.internal.config.parseAs +import net.corda.notary.mysql.MySQLNotaryConfiguration import net.corda.notary.mysql.MySQLUniquenessProvider import net.corda.notarytest.flows.AsyncLoadTestFlow import java.net.InetAddress @@ -61,7 +61,7 @@ class JDBCNotaryService(override val services: AppServiceHub, override val notar } private fun createUniquenessProvider(): MySQLUniquenessProvider { - val mysqlConfig = appConfig.getConfig("mysql").parseAs() + val mysqlConfig = appConfig.getConfig("mysql").parseAs() return MySQLUniquenessProvider(createMetricsRegistry(), services.clock, mysqlConfig) } }