From b9ff958011f56dd5ebeb8b67177e26e0781b7f7e Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Fri, 2 Nov 2018 09:20:20 +0000 Subject: [PATCH] ENT-1858: All non-bft notary services are now async --- .../internal/notary/AsyncCFTNotaryService.kt | 77 ------------------- .../core/internal/notary/NotaryServiceFlow.kt | 3 +- .../net/corda/notary/jpa/JPANotaryService.kt | 8 +- .../corda/notary/jpa/JPAUniquenessProvider.kt | 24 +++--- .../notary/jpa/JPAUniquenessProviderTests.kt | 30 ++++---- .../corda/notary/mysql/MySQLNotaryService.kt | 10 +-- .../notary/mysql/MySQLUniquenessProvider.kt | 8 +- .../notary/mysql/MySQLNotaryServiceTests.kt | 6 +- .../notarytest/flows/AsyncLoadTestFlow.kt | 11 ++- .../corda/notarytest/flows/LoadTestFlow.kt | 67 ---------------- .../notarytest/service/JDBCNotaryService.kt | 10 +-- 11 files changed, 54 insertions(+), 200 deletions(-) delete mode 100644 core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt delete mode 100644 tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/LoadTestFlow.kt diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt deleted file mode 100644 index ea925d1def..0000000000 --- a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt +++ /dev/null @@ -1,77 +0,0 @@ -package net.corda.core.internal.notary - -import co.paralleluniverse.fibers.Suspendable -import net.corda.core.concurrent.CordaFuture -import net.corda.core.contracts.StateRef -import net.corda.core.contracts.TimeWindow -import net.corda.core.crypto.SecureHash -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.NotarisationRequestSignature -import net.corda.core.identity.Party -import net.corda.core.internal.FlowAsyncOperation -import net.corda.core.internal.executeAsync -import net.corda.core.internal.notary.AsyncUniquenessProvider.Result -import net.corda.core.serialization.CordaSerializable - -/** Base notary implementation for a notary that supports asynchronous calls from a flow. */ -abstract class AsyncCFTNotaryService : TrustedAuthorityNotaryService() { - override val uniquenessProvider: UniquenessProvider get() = asyncUniquenessProvider - /** A uniqueness provider that supports asynchronous commits. */ - protected abstract val asyncUniquenessProvider: AsyncUniquenessProvider - - /** - * @throws NotaryInternalException if any of the states have been consumed by a different transaction. - */ - @Suspendable - override fun commitInputStates( - inputs: List, - txId: SecureHash, - caller: Party, - requestSignature: NotarisationRequestSignature, - timeWindow: TimeWindow?, - references: List - ) { - // TODO: Log the request here. Benchmarking shows that logging is expensive and we might get better performance - // when we concurrently log requests here as part of the flows, instead of logging sequentially in the - // `UniquenessProvider`. - val result = FlowLogic.currentTopLevel!!.executeAsync(AsyncCFTNotaryService.CommitOperation(this, inputs, txId, caller, requestSignature, timeWindow, references)) - if (result is Result.Failure) throw NotaryInternalException(result.error) - } - - /** - * Commits the provided input states asynchronously. - * - * If a consumed state conflict is reported by the [asyncUniquenessProvider], but it is caused by the same - * transaction – the transaction is getting notarised twice – a success response will be returned. - */ - private fun commitAsync( - states: List, - txId: SecureHash, - callerIdentity: Party, - requestSignature: NotarisationRequestSignature, - timeWindow: TimeWindow?, - references: List - ): CordaFuture { - return asyncUniquenessProvider.commitAsync(states, txId, callerIdentity, requestSignature, timeWindow, references) - } - - /** - * Required for the flow to be able to suspend until the commit is complete. - * This object will be included in the flow checkpoint. - */ - @CordaSerializable - class CommitOperation( - val service: AsyncCFTNotaryService, - val inputs: List, - val txId: SecureHash, - val caller: Party, - val requestSignature: NotarisationRequestSignature, - val timeWindow: TimeWindow?, - val references: List - ): FlowAsyncOperation { - override fun execute(deduplicationId: String): CordaFuture { - return service.commitAsync(inputs, txId, caller, requestSignature, timeWindow, references) - } - } -} - diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt index 938039e03d..6d3303aee5 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt @@ -6,6 +6,7 @@ import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.flows.* import net.corda.core.identity.Party +import net.corda.core.internal.IdempotentFlow import net.corda.core.utilities.unwrap /** @@ -17,7 +18,7 @@ import net.corda.core.utilities.unwrap * Additional transaction validation logic can be added when implementing [validateRequest]. */ // See AbstractStateReplacementFlow.Acceptor for why it's Void? -abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic() { +abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic(), IdempotentFlow { companion object { // TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder. private const val maxAllowedInputsAndReferences = 10_000 diff --git a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt index 549b6f81a6..d4660a60fb 100644 --- a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt +++ b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt @@ -1,8 +1,8 @@ package net.corda.notary.jpa import net.corda.core.flows.FlowSession -import net.corda.core.internal.notary.AsyncCFTNotaryService import net.corda.core.internal.notary.NotaryServiceFlow +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.node.services.transactions.ValidatingNotaryFlow @@ -12,12 +12,12 @@ import java.security.PublicKey /** Notary service backed by a replicated MySQL database. */ class JPANotaryService( override val services: ServiceHubInternal, - override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() { + override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { private val notaryConfig = services.configuration.notary ?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") - override val asyncUniquenessProvider = with(services) { + override val uniquenessProvider = with(services) { val jpaNotaryConfig = try { notaryConfig.extraConfig!!.parseAs() } catch (e: Exception) { @@ -36,6 +36,6 @@ class JPANotaryService( } override fun stop() { - asyncUniquenessProvider.stop() + uniquenessProvider.stop() } } diff --git a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt index d4a3104bd5..6172b9f18f 100644 --- a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt +++ b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt @@ -12,8 +12,8 @@ import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture -import net.corda.core.internal.notary.AsyncUniquenessProvider import net.corda.core.internal.notary.NotaryInternalException +import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.internal.notary.isConsumedByTheSameTx import net.corda.core.internal.notary.validateTimeWindow import net.corda.core.serialization.CordaSerializable @@ -38,7 +38,7 @@ import kotlin.concurrent.thread /** A JPA backed Uniqueness provider */ @ThreadSafe -class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val config: JPANotaryConfiguration) : AsyncUniquenessProvider, SingletonSerializeAsToken() { +class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val config: JPANotaryConfiguration) : UniquenessProvider, SingletonSerializeAsToken() { // TODO: test vs. MySQLUniquenessProvider @@ -75,7 +75,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va val requestSignature: NotarisationRequestSignature, val timeWindow: TimeWindow?, val references: List, - val future: OpenFuture, + val future: OpenFuture, val requestEntity: Request, val committedStatesEntities: List) @@ -132,15 +132,15 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va * * Returns a future that will complete once the requestEntitiy is processed, containing the commit [Result]. */ - override fun commitAsync( + override fun commit( states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List - ): CordaFuture { - val future = openFuture() + ): CordaFuture { + val future = openFuture() val requestEntities = Request(consumingTxHash = txId.toString(), partyName = callerIdentity.name.toString(), requestSignature = requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(CordaSerializationEncoding.SNAPPY)).bytes, @@ -215,7 +215,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va return findAlreadyCommitted(session, allInputs, references).toMutableMap() } - private fun processRequest(request: CommitRequest, allConflicts: MutableMap, toCommit: MutableList): AsyncUniquenessProvider.Result { + private fun processRequest(request: CommitRequest, allConflicts: MutableMap, toCommit: MutableList): UniquenessProvider.Result { val conflicts = (request.states + request.references).mapNotNull { if (allConflicts.containsKey(it)) it to allConflicts[it]!! @@ -223,9 +223,9 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va }.toMap() val result = if (conflicts.isNotEmpty()) { if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) { - AsyncUniquenessProvider.Result.Success + UniquenessProvider.Result.Success } else { - AsyncUniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts)) + UniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts)) } } else { val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow) @@ -235,9 +235,9 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va request.states.forEach { allConflicts[it] = StateConsumptionDetails(request.txId.sha256()) } - AsyncUniquenessProvider.Result.Success + UniquenessProvider.Result.Success } else { - AsyncUniquenessProvider.Result.Failure(outsideTimeWindowError) + UniquenessProvider.Result.Failure(outsideTimeWindowError) } } return result @@ -275,7 +275,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va private fun respondWithError(request: CommitRequest, exception: Exception) { if (exception is NotaryInternalException) { - request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error)) + request.future.set(UniquenessProvider.Result.Failure(exception.error)) } else { request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error.")))) } diff --git a/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt b/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt index 6e4255482e..453b689cd2 100644 --- a/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt +++ b/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt @@ -7,10 +7,8 @@ import net.corda.core.crypto.sha256 import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError import net.corda.core.identity.CordaX500Name -import net.corda.core.internal.notary.NotaryInternalException -import net.corda.node.services.config.NotaryConfig +import net.corda.core.internal.notary.UniquenessProvider import net.corda.node.services.schema.NodeSchemaService -import net.corda.nodeapi.internal.config.toConfig import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.notary.jpa.JPAUniquenessProvider.Companion.decodeStateRef @@ -27,7 +25,6 @@ import org.junit.Rule import org.junit.Test import java.time.Clock import kotlin.test.assertEquals -import kotlin.test.assertFailsWith class JPAUniquenessProviderTests { @Rule @@ -54,11 +51,12 @@ class JPAUniquenessProviderTests { } @Test - fun `should commit a transaction with unused inputs without exception`() { + fun `should successfully commit a transaction with unused inputs`() { val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig) val inputState = generateStateRef() - provider.commit(listOf(inputState), txID, identity, requestSignature) + val result = provider.commit(listOf(inputState), txID, identity, requestSignature).get() + assertEquals(UniquenessProvider.Result.Success, result) } @Test @@ -68,13 +66,12 @@ class JPAUniquenessProviderTests { val inputs = listOf(inputState) val firstTxId = txID - provider.commit(inputs, firstTxId, identity, requestSignature) + val firstResult = provider.commit(inputs, firstTxId, identity, requestSignature).get() + assertEquals(UniquenessProvider.Result.Success, firstResult) val secondTxId = SecureHash.randomSHA256() - val ex = assertFailsWith { - provider.commit(inputs, secondTxId, identity, requestSignature) - } - val error = ex.error as NotaryError.Conflict + val secondResult = provider.commit(inputs, secondTxId, identity, requestSignature).get() + val error = (secondResult as UniquenessProvider.Result.Failure).error as NotaryError.Conflict val conflictCause = error.consumedStates[inputState]!! assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256()) @@ -91,14 +88,15 @@ class JPAUniquenessProviderTests { val nrStates = notaryConfig.maxInputStates + notaryConfig.maxInputStates / 2 val stateRefs = (1..nrStates).map { generateStateRef() } println(stateRefs.size) + val firstTxId = SecureHash.randomSHA256() val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig) - provider.commit(stateRefs, firstTxId, identity, requestSignature) + val firstResult = provider.commit(stateRefs, firstTxId, identity, requestSignature).get() + assertEquals(UniquenessProvider.Result.Success, firstResult) + val secondTxId = SecureHash.randomSHA256() - val ex = assertFailsWith { - provider.commit(stateRefs, secondTxId, identity, requestSignature) - } - val error = ex.error as NotaryError.Conflict + val secondResult = provider.commit(stateRefs, secondTxId, identity, requestSignature).get() + val error = (secondResult as UniquenessProvider.Result.Failure).error as NotaryError.Conflict assertEquals(nrStates, error.consumedStates.size) } } diff --git a/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLNotaryService.kt b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLNotaryService.kt index 65d7f7a731..56f8a3682e 100644 --- a/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLNotaryService.kt +++ b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLNotaryService.kt @@ -1,8 +1,8 @@ package net.corda.notary.mysql import net.corda.core.flows.FlowSession -import net.corda.core.internal.notary.AsyncCFTNotaryService import net.corda.core.internal.notary.NotaryServiceFlow +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.node.services.transactions.ValidatingNotaryFlow @@ -12,7 +12,7 @@ import java.security.PublicKey /** Notary service backed by a replicated MySQL database. */ class MySQLNotaryService( override val services: ServiceHubInternal, - override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() { + override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { /** Database table will be automatically created in dev mode */ private val devMode = services.configuration.devMode @@ -20,7 +20,7 @@ class MySQLNotaryService( private val notaryConfig = services.configuration.notary ?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") - override val asyncUniquenessProvider = with(services) { + override val uniquenessProvider = with(services) { val mysqlConfig = try { notaryConfig.extraConfig!!.parseAs() } catch (e: Exception) { @@ -41,10 +41,10 @@ class MySQLNotaryService( override fun start() { - if (devMode) asyncUniquenessProvider.createTable() + if (devMode) uniquenessProvider.createTable() } override fun stop() { - asyncUniquenessProvider.stop() + uniquenessProvider.stop() } } \ No newline at end of file diff --git a/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt index 8bc01fd3f6..445540ec5d 100644 --- a/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt +++ b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt @@ -19,9 +19,9 @@ import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture -import net.corda.core.internal.notary.AsyncUniquenessProvider -import net.corda.core.internal.notary.AsyncUniquenessProvider.Result import net.corda.core.internal.notary.NotaryInternalException +import net.corda.core.internal.notary.UniquenessProvider +import net.corda.core.internal.notary.UniquenessProvider.Result import net.corda.core.internal.notary.isConsumedByTheSameTx import net.corda.core.internal.notary.validateTimeWindow import net.corda.core.serialization.SerializationDefaults @@ -49,7 +49,7 @@ class MySQLUniquenessProvider( metrics: MetricRegistry, val clock: Clock, val config: MySQLNotaryConfiguration -) : AsyncUniquenessProvider, SingletonSerializeAsToken() { +) : UniquenessProvider, SingletonSerializeAsToken() { companion object { private val log = loggerFor() // TODO: optimize table schema for InnoDB @@ -161,7 +161,7 @@ class MySQLUniquenessProvider( * * Returns a future that will complete once the request is processed, containing the commit [Result]. */ - override fun commitAsync( + override fun commit( states: List, txId: SecureHash, callerIdentity: Party, diff --git a/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt b/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt index 1de0acac4f..1f3ab696ca 100644 --- a/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt +++ b/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt @@ -15,8 +15,8 @@ import net.corda.core.flows.* import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.concurrent.transpose -import net.corda.core.internal.notary.AsyncCFTNotaryService -import net.corda.core.internal.notary.AsyncUniquenessProvider.Result +import net.corda.core.internal.notary.SinglePartyNotaryService +import net.corda.core.internal.notary.UniquenessProvider.Result import net.corda.core.internal.notary.generateSignature import net.corda.core.node.NotaryInfo import net.corda.core.transactions.TransactionBuilder @@ -223,7 +223,7 @@ class MySQLNotaryServiceTests : IntegrationTest() { if (requestSignature == null || random.nextInt(10) < 2) { requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub) } - futures += AsyncCFTNotaryService.CommitOperation( + futures += SinglePartyNotaryService.CommitOperation( service, inputs, txId, diff --git a/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/AsyncLoadTestFlow.kt b/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/AsyncLoadTestFlow.kt index 4a4a5155b4..8cc04b020d 100644 --- a/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/AsyncLoadTestFlow.kt +++ b/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/AsyncLoadTestFlow.kt @@ -10,20 +10,19 @@ import net.corda.core.crypto.entropyToKeyPair import net.corda.core.crypto.generateKeyPair import net.corda.core.flows.FlowLogic import net.corda.core.flows.NotarisationRequest -import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.StartableByRPC import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.concurrent.transpose -import net.corda.core.internal.notary.AsyncCFTNotaryService -import net.corda.core.internal.notary.AsyncUniquenessProvider +import net.corda.core.internal.notary.SinglePartyNotaryService +import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.internal.notary.generateSignature import java.math.BigInteger import java.util.* import java.util.concurrent.TimeUnit @StartableByRPC -open class AsyncLoadTestFlow( +open class AsyncLoadTestFlow( private val serviceType: Class, private val transactionCount: Int, private val batchSize: Int = 100, @@ -57,7 +56,7 @@ open class AsyncLoadTestFlow( private fun runBatch(transactionCount: Int): Long { val stopwatch = Stopwatch.createStarted() - val futures = mutableListOf>() + val futures = mutableListOf>() val service = serviceHub.cordaService(serviceType) @@ -72,7 +71,7 @@ open class AsyncLoadTestFlow( val inputs = inputGenerator.generateOrFail(random) val requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub) - futures += AsyncCFTNotaryService.CommitOperation(service, inputs, txId, callerParty, requestSignature, + futures += SinglePartyNotaryService.CommitOperation(service, inputs, txId, callerParty, requestSignature, null, emptyList()).execute("") } diff --git a/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/LoadTestFlow.kt b/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/LoadTestFlow.kt deleted file mode 100644 index b0b78e084e..0000000000 --- a/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/LoadTestFlow.kt +++ /dev/null @@ -1,67 +0,0 @@ -package net.corda.notarytest.flows - -import co.paralleluniverse.fibers.Suspendable -import com.google.common.base.Stopwatch -import net.corda.client.mock.Generator -import net.corda.core.contracts.StateRef -import net.corda.core.crypto.SecureHash -import net.corda.core.crypto.entropyToKeyPair -import net.corda.core.crypto.generateKeyPair -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.NotarisationRequest -import net.corda.core.flows.StartableByRPC -import net.corda.core.identity.CordaX500Name -import net.corda.core.identity.Party -import net.corda.core.internal.notary.TrustedAuthorityNotaryService -import net.corda.core.internal.notary.generateSignature -import java.math.BigInteger -import java.util.* -import java.util.concurrent.TimeUnit - -@StartableByRPC -open class LoadTestFlow( - private val serviceType: Class, - private val transactionCount: Int, - /** - * Number of input states per transaction. - * If *null*, variable sized transactions will be created with median size 4. - */ - private val inputStateCount: Int? -) : FlowLogic() { - private val keyPairGenerator = Generator.long().map { entropyToKeyPair(BigInteger.valueOf(it)) } - private val publicKeyGenerator = keyPairGenerator.map { it.public } - - private val publicKeyGenerator2 = Generator.pure(generateKeyPair().public) - private val partyGenerator: Generator = Generator.int().combine(publicKeyGenerator2) { n, key -> - Party(CordaX500Name(organisation = "Party$n", locality = "London", country = "GB"), key) - } - private val txIdGenerator = Generator.bytes(32).map { SecureHash.sha256(it) } - private val stateRefGenerator = Generator.intRange(0, 10).map { StateRef(SecureHash.randomSHA256(), it) } - - @Suspendable - override fun call(): Long { - val stopwatch = Stopwatch.createStarted() - val random = SplittableRandom() - - for (i in 1..transactionCount) { - val txId: SecureHash = txIdGenerator.generateOrFail(random) - val callerParty = partyGenerator.generateOrFail(random) - val inputGenerator = if (inputStateCount == null) { - Generator.replicatePoisson(4.0, stateRefGenerator, true) - } else { - Generator.replicate(inputStateCount, stateRefGenerator) - } - val inputs = inputGenerator.generateOrFail(random) - val localStopwatch = Stopwatch.createStarted() - val sig = NotarisationRequest(inputs, txId).generateSignature(serviceHub) - serviceHub.cordaService(serviceType).commitInputStates(inputs, txId, callerParty, sig, null) - logger.info("Committed a transaction ${txId.toString().take(10)} with ${inputs.size} inputs in ${localStopwatch.stop().elapsed(TimeUnit.MILLISECONDS)} ms") - } - - stopwatch.stop() - val duration = stopwatch.elapsed(TimeUnit.MILLISECONDS) - logger.info("Committed $transactionCount transactions in $duration, avg ${duration.toDouble() / transactionCount} ms") - - return duration - } -} \ No newline at end of file diff --git a/tools/notarytest/src/main/kotlin/net/corda/notarytest/service/JDBCNotaryService.kt b/tools/notarytest/src/main/kotlin/net/corda/notarytest/service/JDBCNotaryService.kt index 78ec3ce6f7..ef692a5833 100644 --- a/tools/notarytest/src/main/kotlin/net/corda/notarytest/service/JDBCNotaryService.kt +++ b/tools/notarytest/src/main/kotlin/net/corda/notarytest/service/JDBCNotaryService.kt @@ -7,7 +7,7 @@ import com.codahale.metrics.graphite.PickledGraphite import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowSession import net.corda.core.flows.StartableByRPC -import net.corda.core.internal.notary.AsyncCFTNotaryService +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService import net.corda.core.utilities.NetworkHostAndPort @@ -24,19 +24,19 @@ import java.security.PublicKey import java.util.concurrent.TimeUnit @CordaService -class JDBCNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() { +class JDBCNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { private val appConfig = ConfigHelper.loadConfig(Paths.get(".")).getConfig("custom") - override val asyncUniquenessProvider: MySQLUniquenessProvider = createUniquenessProvider() + override val uniquenessProvider: MySQLUniquenessProvider = createUniquenessProvider() override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = NonValidatingNotaryFlow(otherPartySession, this) override fun start() { - asyncUniquenessProvider.createTable() + uniquenessProvider.createTable() } override fun stop() { - asyncUniquenessProvider.stop() + uniquenessProvider.stop() } private fun createMetricsRegistry(): MetricRegistry {