ENT-1858: All non-bft notary services are now async

This commit is contained in:
Andrius Dagys 2018-11-02 09:20:20 +00:00
parent 539fac0d57
commit b9ff958011
11 changed files with 54 additions and 200 deletions

View File

@ -1,77 +0,0 @@
package net.corda.core.internal.notary
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.executeAsync
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
import net.corda.core.serialization.CordaSerializable
/** Base notary implementation for a notary that supports asynchronous calls from a flow. */
abstract class AsyncCFTNotaryService : TrustedAuthorityNotaryService() {
override val uniquenessProvider: UniquenessProvider get() = asyncUniquenessProvider
/** A uniqueness provider that supports asynchronous commits. */
protected abstract val asyncUniquenessProvider: AsyncUniquenessProvider
/**
* @throws NotaryInternalException if any of the states have been consumed by a different transaction.
*/
@Suspendable
override fun commitInputStates(
inputs: List<StateRef>,
txId: SecureHash,
caller: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?,
references: List<StateRef>
) {
// TODO: Log the request here. Benchmarking shows that logging is expensive and we might get better performance
// when we concurrently log requests here as part of the flows, instead of logging sequentially in the
// `UniquenessProvider`.
val result = FlowLogic.currentTopLevel!!.executeAsync(AsyncCFTNotaryService.CommitOperation(this, inputs, txId, caller, requestSignature, timeWindow, references))
if (result is Result.Failure) throw NotaryInternalException(result.error)
}
/**
* Commits the provided input states asynchronously.
*
* If a consumed state conflict is reported by the [asyncUniquenessProvider], but it is caused by the same
* transaction the transaction is getting notarised twice a success response will be returned.
*/
private fun commitAsync(
states: List<StateRef>,
txId: SecureHash,
callerIdentity: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?,
references: List<StateRef>
): CordaFuture<Result> {
return asyncUniquenessProvider.commitAsync(states, txId, callerIdentity, requestSignature, timeWindow, references)
}
/**
* Required for the flow to be able to suspend until the commit is complete.
* This object will be included in the flow checkpoint.
*/
@CordaSerializable
class CommitOperation(
val service: AsyncCFTNotaryService,
val inputs: List<StateRef>,
val txId: SecureHash,
val caller: Party,
val requestSignature: NotarisationRequestSignature,
val timeWindow: TimeWindow?,
val references: List<StateRef>
): FlowAsyncOperation<Result> {
override fun execute(deduplicationId: String): CordaFuture<Result> {
return service.commitAsync(inputs, txId, caller, requestSignature, timeWindow, references)
}
}
}

View File

@ -6,6 +6,7 @@ import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.IdempotentFlow
import net.corda.core.utilities.unwrap
/**
@ -17,7 +18,7 @@ import net.corda.core.utilities.unwrap
* Additional transaction validation logic can be added when implementing [validateRequest].
*/
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic<Void?>() {
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic<Void?>(), IdempotentFlow {
companion object {
// TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder.
private const val maxAllowedInputsAndReferences = 10_000

View File

@ -1,8 +1,8 @@
package net.corda.notary.jpa
import net.corda.core.flows.FlowSession
import net.corda.core.internal.notary.AsyncCFTNotaryService
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.node.services.transactions.ValidatingNotaryFlow
@ -12,12 +12,12 @@ import java.security.PublicKey
/** Notary service backed by a replicated MySQL database. */
class JPANotaryService(
override val services: ServiceHubInternal,
override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() {
override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
private val notaryConfig = services.configuration.notary
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
override val asyncUniquenessProvider = with(services) {
override val uniquenessProvider = with(services) {
val jpaNotaryConfig = try {
notaryConfig.extraConfig!!.parseAs<JPANotaryConfiguration>()
} catch (e: Exception) {
@ -36,6 +36,6 @@ class JPANotaryService(
}
override fun stop() {
asyncUniquenessProvider.stop()
uniquenessProvider.stop()
}
}

View File

@ -12,8 +12,8 @@ import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.AsyncUniquenessProvider
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.internal.notary.isConsumedByTheSameTx
import net.corda.core.internal.notary.validateTimeWindow
import net.corda.core.serialization.CordaSerializable
@ -38,7 +38,7 @@ import kotlin.concurrent.thread
/** A JPA backed Uniqueness provider */
@ThreadSafe
class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val config: JPANotaryConfiguration) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val config: JPANotaryConfiguration) : UniquenessProvider, SingletonSerializeAsToken() {
// TODO: test vs. MySQLUniquenessProvider
@ -75,7 +75,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
val requestSignature: NotarisationRequestSignature,
val timeWindow: TimeWindow?,
val references: List<StateRef>,
val future: OpenFuture<AsyncUniquenessProvider.Result>,
val future: OpenFuture<UniquenessProvider.Result>,
val requestEntity: Request,
val committedStatesEntities: List<CommittedState>)
@ -132,15 +132,15 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
*
* Returns a future that will complete once the requestEntitiy is processed, containing the commit [Result].
*/
override fun commitAsync(
override fun commit(
states: List<StateRef>,
txId: SecureHash,
callerIdentity: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?,
references: List<StateRef>
): CordaFuture<AsyncUniquenessProvider.Result> {
val future = openFuture<AsyncUniquenessProvider.Result>()
): CordaFuture<UniquenessProvider.Result> {
val future = openFuture<UniquenessProvider.Result>()
val requestEntities = Request(consumingTxHash = txId.toString(),
partyName = callerIdentity.name.toString(),
requestSignature = requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(CordaSerializationEncoding.SNAPPY)).bytes,
@ -215,7 +215,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
return findAlreadyCommitted(session, allInputs, references).toMutableMap()
}
private fun processRequest(request: CommitRequest, allConflicts: MutableMap<StateRef, StateConsumptionDetails>, toCommit: MutableList<CommitRequest>): AsyncUniquenessProvider.Result {
private fun processRequest(request: CommitRequest, allConflicts: MutableMap<StateRef, StateConsumptionDetails>, toCommit: MutableList<CommitRequest>): UniquenessProvider.Result {
val conflicts = (request.states + request.references).mapNotNull {
if (allConflicts.containsKey(it)) it to allConflicts[it]!!
@ -223,9 +223,9 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
}.toMap()
val result = if (conflicts.isNotEmpty()) {
if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) {
AsyncUniquenessProvider.Result.Success
UniquenessProvider.Result.Success
} else {
AsyncUniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts))
UniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts))
}
} else {
val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow)
@ -235,9 +235,9 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
request.states.forEach {
allConflicts[it] = StateConsumptionDetails(request.txId.sha256())
}
AsyncUniquenessProvider.Result.Success
UniquenessProvider.Result.Success
} else {
AsyncUniquenessProvider.Result.Failure(outsideTimeWindowError)
UniquenessProvider.Result.Failure(outsideTimeWindowError)
}
}
return result
@ -275,7 +275,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
private fun respondWithError(request: CommitRequest, exception: Exception) {
if (exception is NotaryInternalException) {
request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error))
request.future.set(UniquenessProvider.Result.Failure(exception.error))
} else {
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
}

View File

@ -7,10 +7,8 @@ import net.corda.core.crypto.sha256
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.node.services.config.NotaryConfig
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.node.services.schema.NodeSchemaService
import net.corda.nodeapi.internal.config.toConfig
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.decodeStateRef
@ -27,7 +25,6 @@ import org.junit.Rule
import org.junit.Test
import java.time.Clock
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class JPAUniquenessProviderTests {
@Rule
@ -54,11 +51,12 @@ class JPAUniquenessProviderTests {
}
@Test
fun `should commit a transaction with unused inputs without exception`() {
fun `should successfully commit a transaction with unused inputs`() {
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
val inputState = generateStateRef()
provider.commit(listOf(inputState), txID, identity, requestSignature)
val result = provider.commit(listOf(inputState), txID, identity, requestSignature).get()
assertEquals(UniquenessProvider.Result.Success, result)
}
@Test
@ -68,13 +66,12 @@ class JPAUniquenessProviderTests {
val inputs = listOf(inputState)
val firstTxId = txID
provider.commit(inputs, firstTxId, identity, requestSignature)
val firstResult = provider.commit(inputs, firstTxId, identity, requestSignature).get()
assertEquals(UniquenessProvider.Result.Success, firstResult)
val secondTxId = SecureHash.randomSHA256()
val ex = assertFailsWith<NotaryInternalException> {
provider.commit(inputs, secondTxId, identity, requestSignature)
}
val error = ex.error as NotaryError.Conflict
val secondResult = provider.commit(inputs, secondTxId, identity, requestSignature).get()
val error = (secondResult as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
val conflictCause = error.consumedStates[inputState]!!
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
@ -91,14 +88,15 @@ class JPAUniquenessProviderTests {
val nrStates = notaryConfig.maxInputStates + notaryConfig.maxInputStates / 2
val stateRefs = (1..nrStates).map { generateStateRef() }
println(stateRefs.size)
val firstTxId = SecureHash.randomSHA256()
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
provider.commit(stateRefs, firstTxId, identity, requestSignature)
val firstResult = provider.commit(stateRefs, firstTxId, identity, requestSignature).get()
assertEquals(UniquenessProvider.Result.Success, firstResult)
val secondTxId = SecureHash.randomSHA256()
val ex = assertFailsWith<NotaryInternalException> {
provider.commit(stateRefs, secondTxId, identity, requestSignature)
}
val error = ex.error as NotaryError.Conflict
val secondResult = provider.commit(stateRefs, secondTxId, identity, requestSignature).get()
val error = (secondResult as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
assertEquals(nrStates, error.consumedStates.size)
}
}

View File

@ -1,8 +1,8 @@
package net.corda.notary.mysql
import net.corda.core.flows.FlowSession
import net.corda.core.internal.notary.AsyncCFTNotaryService
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.node.services.transactions.ValidatingNotaryFlow
@ -12,7 +12,7 @@ import java.security.PublicKey
/** Notary service backed by a replicated MySQL database. */
class MySQLNotaryService(
override val services: ServiceHubInternal,
override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() {
override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
/** Database table will be automatically created in dev mode */
private val devMode = services.configuration.devMode
@ -20,7 +20,7 @@ class MySQLNotaryService(
private val notaryConfig = services.configuration.notary
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
override val asyncUniquenessProvider = with(services) {
override val uniquenessProvider = with(services) {
val mysqlConfig = try {
notaryConfig.extraConfig!!.parseAs<MySQLNotaryConfiguration>()
} catch (e: Exception) {
@ -41,10 +41,10 @@ class MySQLNotaryService(
override fun start() {
if (devMode) asyncUniquenessProvider.createTable()
if (devMode) uniquenessProvider.createTable()
}
override fun stop() {
asyncUniquenessProvider.stop()
uniquenessProvider.stop()
}
}

View File

@ -19,9 +19,9 @@ import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.AsyncUniquenessProvider
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.internal.notary.UniquenessProvider.Result
import net.corda.core.internal.notary.isConsumedByTheSameTx
import net.corda.core.internal.notary.validateTimeWindow
import net.corda.core.serialization.SerializationDefaults
@ -49,7 +49,7 @@ class MySQLUniquenessProvider(
metrics: MetricRegistry,
val clock: Clock,
val config: MySQLNotaryConfiguration
) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
) : UniquenessProvider, SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<MySQLUniquenessProvider>()
// TODO: optimize table schema for InnoDB
@ -161,7 +161,7 @@ class MySQLUniquenessProvider(
*
* Returns a future that will complete once the request is processed, containing the commit [Result].
*/
override fun commitAsync(
override fun commit(
states: List<StateRef>,
txId: SecureHash,
callerIdentity: Party,

View File

@ -15,8 +15,8 @@ import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.notary.AsyncCFTNotaryService
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.internal.notary.UniquenessProvider.Result
import net.corda.core.internal.notary.generateSignature
import net.corda.core.node.NotaryInfo
import net.corda.core.transactions.TransactionBuilder
@ -223,7 +223,7 @@ class MySQLNotaryServiceTests : IntegrationTest() {
if (requestSignature == null || random.nextInt(10) < 2) {
requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub)
}
futures += AsyncCFTNotaryService.CommitOperation(
futures += SinglePartyNotaryService.CommitOperation(
service,
inputs,
txId,

View File

@ -10,20 +10,19 @@ import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotarisationRequest
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.notary.AsyncCFTNotaryService
import net.corda.core.internal.notary.AsyncUniquenessProvider
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.internal.notary.generateSignature
import java.math.BigInteger
import java.util.*
import java.util.concurrent.TimeUnit
@StartableByRPC
open class AsyncLoadTestFlow<T : AsyncCFTNotaryService>(
open class AsyncLoadTestFlow<T : SinglePartyNotaryService>(
private val serviceType: Class<T>,
private val transactionCount: Int,
private val batchSize: Int = 100,
@ -57,7 +56,7 @@ open class AsyncLoadTestFlow<T : AsyncCFTNotaryService>(
private fun runBatch(transactionCount: Int): Long {
val stopwatch = Stopwatch.createStarted()
val futures = mutableListOf<CordaFuture<AsyncUniquenessProvider.Result>>()
val futures = mutableListOf<CordaFuture<UniquenessProvider.Result>>()
val service = serviceHub.cordaService(serviceType)
@ -72,7 +71,7 @@ open class AsyncLoadTestFlow<T : AsyncCFTNotaryService>(
val inputs = inputGenerator.generateOrFail(random)
val requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub)
futures += AsyncCFTNotaryService.CommitOperation(service, inputs, txId, callerParty, requestSignature,
futures += SinglePartyNotaryService.CommitOperation(service, inputs, txId, callerParty, requestSignature,
null, emptyList()).execute("")
}

View File

@ -1,67 +0,0 @@
package net.corda.notarytest.flows
import co.paralleluniverse.fibers.Suspendable
import com.google.common.base.Stopwatch
import net.corda.client.mock.Generator
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotarisationRequest
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.core.internal.notary.generateSignature
import java.math.BigInteger
import java.util.*
import java.util.concurrent.TimeUnit
@StartableByRPC
open class LoadTestFlow<T : TrustedAuthorityNotaryService>(
private val serviceType: Class<T>,
private val transactionCount: Int,
/**
* Number of input states per transaction.
* If *null*, variable sized transactions will be created with median size 4.
*/
private val inputStateCount: Int?
) : FlowLogic<Long>() {
private val keyPairGenerator = Generator.long().map { entropyToKeyPair(BigInteger.valueOf(it)) }
private val publicKeyGenerator = keyPairGenerator.map { it.public }
private val publicKeyGenerator2 = Generator.pure(generateKeyPair().public)
private val partyGenerator: Generator<Party> = Generator.int().combine(publicKeyGenerator2) { n, key ->
Party(CordaX500Name(organisation = "Party$n", locality = "London", country = "GB"), key)
}
private val txIdGenerator = Generator.bytes(32).map { SecureHash.sha256(it) }
private val stateRefGenerator = Generator.intRange(0, 10).map { StateRef(SecureHash.randomSHA256(), it) }
@Suspendable
override fun call(): Long {
val stopwatch = Stopwatch.createStarted()
val random = SplittableRandom()
for (i in 1..transactionCount) {
val txId: SecureHash = txIdGenerator.generateOrFail(random)
val callerParty = partyGenerator.generateOrFail(random)
val inputGenerator = if (inputStateCount == null) {
Generator.replicatePoisson(4.0, stateRefGenerator, true)
} else {
Generator.replicate(inputStateCount, stateRefGenerator)
}
val inputs = inputGenerator.generateOrFail(random)
val localStopwatch = Stopwatch.createStarted()
val sig = NotarisationRequest(inputs, txId).generateSignature(serviceHub)
serviceHub.cordaService(serviceType).commitInputStates(inputs, txId, callerParty, sig, null)
logger.info("Committed a transaction ${txId.toString().take(10)} with ${inputs.size} inputs in ${localStopwatch.stop().elapsed(TimeUnit.MILLISECONDS)} ms")
}
stopwatch.stop()
val duration = stopwatch.elapsed(TimeUnit.MILLISECONDS)
logger.info("Committed $transactionCount transactions in $duration, avg ${duration.toDouble() / transactionCount} ms")
return duration
}
}

View File

@ -7,7 +7,7 @@ import com.codahale.metrics.graphite.PickledGraphite
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.notary.AsyncCFTNotaryService
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.utilities.NetworkHostAndPort
@ -24,19 +24,19 @@ import java.security.PublicKey
import java.util.concurrent.TimeUnit
@CordaService
class JDBCNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() {
class JDBCNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
private val appConfig = ConfigHelper.loadConfig(Paths.get(".")).getConfig("custom")
override val asyncUniquenessProvider: MySQLUniquenessProvider = createUniquenessProvider()
override val uniquenessProvider: MySQLUniquenessProvider = createUniquenessProvider()
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = NonValidatingNotaryFlow(otherPartySession, this)
override fun start() {
asyncUniquenessProvider.createTable()
uniquenessProvider.createTable()
}
override fun stop() {
asyncUniquenessProvider.stop()
uniquenessProvider.stop()
}
private fun createMetricsRegistry(): MetricRegistry {