diff --git a/.idea/compiler.xml b/.idea/compiler.xml
index 40b3511c77..0e83636592 100644
--- a/.idea/compiler.xml
+++ b/.idea/compiler.xml
@@ -187,6 +187,8 @@
+
+
diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt
index e4205808f5..d8d111d2c2 100644
--- a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt
+++ b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt
@@ -1,12 +1,15 @@
package net.corda.core.internal.notary
+import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
+import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
+import net.corda.core.internal.executeAsync
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
import net.corda.core.serialization.CordaSerializable
@@ -16,6 +19,25 @@ abstract class AsyncCFTNotaryService : TrustedAuthorityNotaryService() {
/** A uniqueness provider that supports asynchronous commits. */
protected abstract val asyncUniquenessProvider: AsyncUniquenessProvider
+ /**
+ * @throws NotaryInternalException if any of the states have been consumed by a different transaction.
+ */
+ @Suspendable
+ override fun commitInputStates(
+ inputs: List,
+ txId: SecureHash,
+ caller: Party,
+ requestSignature: NotarisationRequestSignature,
+ timeWindow: TimeWindow?,
+ references: List
+ ) {
+ // TODO: Log the request here. Benchmarking shows that logging is expensive and we might get better performance
+ // when we concurrently log requests here as part of the flows, instead of logging sequentially in the
+ // `UniquenessProvider`.
+ val result = FlowLogic.currentTopLevel!!.executeAsync(AsyncCFTNotaryService.CommitOperation(this, inputs, txId, caller, requestSignature, timeWindow, references))
+ if (result is Result.Failure) throw NotaryInternalException(result.error)
+ }
+
/**
* Commits the provided input states asynchronously.
*
diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt
index 1d9328de88..36593eca76 100644
--- a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt
+++ b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt
@@ -18,7 +18,7 @@ interface AsyncUniquenessProvider : UniquenessProvider {
/** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */
override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List) {
- val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow,references).get()
+ val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow, references).get()
if (result is Result.Failure) {
throw NotaryInternalException(result.error)
}
diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt
index 28a1548e3d..0dae619cd8 100644
--- a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt
+++ b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt
@@ -6,6 +6,7 @@ import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.*
import net.corda.core.identity.Party
+import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.executeAsync
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
import net.corda.core.utilities.unwrap
@@ -19,7 +20,7 @@ import net.corda.core.utilities.unwrap
* Additional transaction validation logic can be added when implementing [validateRequest].
*/
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
-abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic() {
+abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic(), IdempotentFlow {
companion object {
// TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder.
private const val maxAllowedInputsAndReferences = 10_000
@@ -36,12 +37,7 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
val parts = validateRequest(requestPayload)
txId = parts.id
checkNotary(parts.notary)
- if (service is AsyncCFTNotaryService) {
- val result = executeAsync(AsyncCFTNotaryService.CommitOperation(service, parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp, parts.references))
- if (result is Result.Failure) throw NotaryInternalException(result.error)
- } else {
- service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp, parts.references)
- }
+ service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp, parts.references)
signTransactionAndSendResponse(txId)
} catch (e: NotaryInternalException) {
throw NotaryException(e.error, txId)
diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt b/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt
index 148b506e82..a9466b11b4 100644
--- a/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt
+++ b/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt
@@ -1,5 +1,6 @@
package net.corda.core.internal.notary
+import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.*
@@ -22,11 +23,12 @@ abstract class TrustedAuthorityNotaryService : NotaryService() {
protected abstract val uniquenessProvider: UniquenessProvider
/**
- * A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
+ * @throws NotaryException if any of the states have been consumed by a different transaction. Note that
* this method does not throw an exception when input states are present multiple times within the transaction.
*/
@JvmOverloads
- fun commitInputStates(
+ @Suspendable
+ open fun commitInputStates(
inputs: List,
txId: SecureHash,
caller: Party,
diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt
index a7d557911e..2f90104588 100644
--- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt
+++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt
@@ -138,7 +138,12 @@ data class NotaryConfig(val validating: Boolean,
val custom: Boolean = false,
val mysql: MySQLConfiguration? = null,
val serviceLegalName: CordaX500Name? = null,
- val className: String = "net.corda.node.services.transactions.SimpleNotaryService"
+ val className: String = "net.corda.node.services.transactions.SimpleNotaryService",
+ val batchSize: Int = 128,
+ val batchTimeoutMs: Long = 1L,
+ val maxInputStates: Int = 2000,
+ val maxDBTransactionRetryCount: Int = 10,
+ val backOffBaseMs: Long = 20L
) {
init {
require(raft == null || bftSMaRt == null || !custom || mysql == null) {
diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt
index 2367ac5fa6..6a1fcb26e8 100644
--- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt
@@ -17,7 +17,6 @@ import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.RunOnceService
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService
-import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
import net.corda.node.services.vault.VaultSchemaV1
diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt
index d382b2feae..b4f42baebe 100644
--- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt
@@ -37,4 +37,4 @@ object NodeNotarySchemaV1 : MappedSchema(schemaFamily = NodeNotarySchema.javaCla
PersistentUniquenessProvider.CommittedState::class.java
)) {
override val migrationResource = "node-notary.changelog-master"
-}
\ No newline at end of file
+}
diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/node/src/main/resources/migration/node-notary.changelog-init.xml b/node/src/main/resources/migration/node-notary.changelog-init.xml
index 8d0f1bcb6f..d50fcd06a6 100644
--- a/node/src/main/resources/migration/node-notary.changelog-init.xml
+++ b/node/src/main/resources/migration/node-notary.changelog-init.xml
@@ -55,4 +55,4 @@
-
\ No newline at end of file
+
diff --git a/node/src/main/resources/migration/node-notary.changelog-master.xml b/node/src/main/resources/migration/node-notary.changelog-master.xml
index 9f169670fc..9a33a10e78 100644
--- a/node/src/main/resources/migration/node-notary.changelog-master.xml
+++ b/node/src/main/resources/migration/node-notary.changelog-master.xml
@@ -5,8 +5,6 @@
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
-
-
diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt
index 463a4120d9..7b79ff93b4 100644
--- a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt
@@ -59,19 +59,21 @@ class PersistentUniquenessProviderTests {
@Test
fun `should report a conflict for a transaction with previously used inputs`() {
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
- val inputState = generateStateRef()
+ val inputState = generateStateRef()
- val inputs = listOf(inputState)
- val firstTxId = txID
- provider.commit(inputs, firstTxId, identity, requestSignature)
+ val inputs = listOf(inputState)
+ val firstTxId = txID
+ provider.commit(inputs, firstTxId, identity, requestSignature)
- val secondTxId = SecureHash.randomSHA256()
- val ex = assertFailsWith {
- provider.commit(inputs, secondTxId, identity, requestSignature)
- }
- val error = ex.error as NotaryError.Conflict
+ provider.commit(inputs, firstTxId, identity, requestSignature)
- val conflictCause = error.consumedStates[inputState]!!
- assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
+ val secondTxId = SecureHash.randomSHA256()
+ val ex = assertFailsWith {
+ provider.commit(inputs, secondTxId, identity, requestSignature)
}
+ val error = ex.error as NotaryError.Conflict
+
+ val conflictCause = error.consumedStates[inputState]!!
+ assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
+ }
}
diff --git a/notary/jpa/build.gradle b/notary/jpa/build.gradle
new file mode 100644
index 0000000000..6bd152af34
--- /dev/null
+++ b/notary/jpa/build.gradle
@@ -0,0 +1,27 @@
+apply plugin: 'kotlin'
+apply plugin: 'kotlin-jpa'
+apply plugin: 'idea'
+apply plugin: 'net.corda.plugins.cordapp'
+apply plugin: 'net.corda.plugins.publish-utils'
+apply plugin: 'net.corda.plugins.quasar-utils'
+
+dependencies {
+ cordaCompile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
+
+ // Corda integration dependencies
+ cordaCompile project(':node')
+
+ testCompile "junit:junit:$junit_version"
+ testCompile project(':node-driver')
+}
+
+idea {
+ module {
+ downloadJavadoc = true // defaults to false
+ downloadSources = true
+ }
+}
+
+publish {
+ name 'corda-notary-jpa'
+}
diff --git a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt
new file mode 100644
index 0000000000..a1051bdcde
--- /dev/null
+++ b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt
@@ -0,0 +1,35 @@
+package net.corda.notary.jpa
+
+import net.corda.core.flows.FlowSession
+import net.corda.core.internal.notary.AsyncCFTNotaryService
+import net.corda.core.internal.notary.NotaryServiceFlow
+import net.corda.node.services.api.ServiceHubInternal
+import net.corda.node.services.transactions.NonValidatingNotaryFlow
+import net.corda.node.services.transactions.ValidatingNotaryFlow
+import java.security.PublicKey
+
+/** Notary service backed by a replicated MySQL database. */
+class JPANotaryService(
+ override val services: ServiceHubInternal,
+ override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() {
+
+ private val notaryConfig = services.configuration.notary
+ ?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
+
+ override val asyncUniquenessProvider = with(services) {
+ JPAUniquenessProvider(services.clock, services.database, notaryConfig)
+ }
+
+ override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow {
+ return if (notaryConfig.validating) {
+ ValidatingNotaryFlow(otherPartySession, this)
+ } else NonValidatingNotaryFlow(otherPartySession, this)
+ }
+
+ override fun start() {
+ }
+
+ override fun stop() {
+ asyncUniquenessProvider.stop()
+ }
+}
diff --git a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt
new file mode 100644
index 0000000000..ac3d3cd2a4
--- /dev/null
+++ b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt
@@ -0,0 +1,281 @@
+package net.corda.notary.jpa
+
+import com.google.common.collect.Queues
+import net.corda.core.concurrent.CordaFuture
+import net.corda.core.contracts.StateRef
+import net.corda.core.contracts.TimeWindow
+import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.sha256
+import net.corda.core.flows.NotarisationRequestSignature
+import net.corda.core.flows.NotaryError
+import net.corda.core.flows.StateConsumptionDetails
+import net.corda.core.identity.Party
+import net.corda.core.internal.concurrent.OpenFuture
+import net.corda.core.internal.concurrent.openFuture
+import net.corda.core.internal.notary.*
+import net.corda.core.serialization.CordaSerializable
+import net.corda.core.serialization.SerializationDefaults
+import net.corda.core.serialization.SingletonSerializeAsToken
+import net.corda.core.serialization.serialize
+import net.corda.core.utilities.contextLogger
+import net.corda.core.utilities.debug
+import net.corda.node.services.config.NotaryConfig
+import net.corda.nodeapi.internal.persistence.CordaPersistence
+import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
+import net.corda.serialization.internal.CordaSerializationEncoding
+import org.hibernate.Session
+import java.sql.SQLException
+import java.time.Clock
+import java.time.Instant
+import java.util.*
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.ThreadSafe
+import javax.persistence.*
+import kotlin.concurrent.thread
+
+/** A JPA backed Uniqueness provider */
+@ThreadSafe
+class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val notaryConfig: NotaryConfig) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
+
+ // TODO: test vs. MySQLUniquenessProvider
+
+ // This is the prefix of the ID in the request log table, to allow running multiple instances that access the
+ // same table.
+ val instanceId = UUID.randomUUID().toString()
+
+ @Entity
+ @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}jpa_notary_request_log")
+ @CordaSerializable
+ class Request(
+ @Id
+ @Column(nullable = true, length=76)
+ var id: String? = null,
+
+ @Column(name = "consuming_transaction_id", nullable = true, length = 64)
+ val consumingTxHash: String?,
+
+ @Column(name = "requesting_party_name", nullable = true, length = 255)
+ var partyName: String?,
+
+ @Lob
+ @Column(name = "request_signature", nullable = false)
+ val requestSignature: ByteArray,
+
+ @Column(name = "request_timestamp", nullable = false)
+ var requestDate: Instant
+ )
+
+ private data class CommitRequest(
+ val states: List,
+ val txId: SecureHash,
+ val callerIdentity: Party,
+ val requestSignature: NotarisationRequestSignature,
+ val timeWindow: TimeWindow?,
+ val references: List,
+ val future: OpenFuture,
+ val requestEntity: Request,
+ val committedStatesEntities: List)
+
+ @Entity
+ @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}jpa_notary_committed_states")
+ @NamedQuery(name = "CommittedState.select", query="SELECT c from JPAUniquenessProvider\$CommittedState c WHERE c.id in :ids")
+ class CommittedState(
+ @Id
+ @Column(name = "state_ref", length = 73)
+ val id: String,
+ @Column(name = "consuming_transaction_id", nullable = false, length = 64)
+ val consumingTxHash: String)
+
+ private val requestQueue = LinkedBlockingQueue(requestQueueSize)
+
+ // TODO: Collect metrics.
+
+ /** A requestEntitiy processor thread. */
+ private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) {
+ try {
+ val buffer = LinkedList()
+ while (!Thread.interrupted()) {
+ val drainedSize = Queues.drain(requestQueue, buffer, notaryConfig.batchSize, notaryConfig.batchTimeoutMs, TimeUnit.MILLISECONDS)
+ if (drainedSize == 0) continue
+ processRequests(buffer)
+ buffer.clear()
+ }
+ } catch (e: InterruptedException) {
+ }
+ log.debug { "Shutting down with ${requestQueue.size} in-flight requests unprocessed." }
+ }
+
+ fun stop() {
+ processorThread.interrupt()
+ }
+
+ companion object {
+ private const val requestQueueSize = 100_000
+ private const val jdbcBatchSize = 100_000
+ private val log = contextLogger()
+
+ fun encodeStateRef(s: StateRef): String {
+ return s.txhash.toString() + ":" + s.index.toString(16)
+ }
+
+ fun decodeStateRef(s: String): StateRef {
+ return StateRef(txhash = SecureHash.parse(s.take(64)), index = s.substring(65).toInt(16))
+ }
+ }
+
+ /**
+ * Generates and adds a [CommitRequest] to the requestEntitiy queue. If the requestEntitiy queue is full, this method will block
+ * until space is available.
+ *
+ * Returns a future that will complete once the requestEntitiy is processed, containing the commit [Result].
+ */
+ override fun commitAsync(
+ states: List,
+ txId: SecureHash,
+ callerIdentity: Party,
+ requestSignature: NotarisationRequestSignature,
+ timeWindow: TimeWindow?,
+ references: List
+ ): CordaFuture {
+ val future = openFuture()
+ val requestEntities = Request(consumingTxHash = txId.toString(),
+ partyName = callerIdentity.name.toString(),
+ requestSignature = requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(CordaSerializationEncoding.SNAPPY)).bytes,
+ requestDate = clock.instant())
+ val stateEntities = states.map { CommittedState(encodeStateRef(it), txId.toString()) }
+ val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future, requestEntities, stateEntities)
+ requestQueue.put(request)
+ return future
+ }
+
+ // Safe up to 100k requests per second.
+ private var nextRequestId = System.currentTimeMillis() * 100
+
+ private fun logRequests(requests: List) {
+ database.transaction {
+ for (request in requests) {
+ request.requestEntity.id = instanceId + (nextRequestId++).toString(16)
+ session.persist(request.requestEntity)
+ }
+ }
+ }
+
+ private fun commitRequests(session: Session, requests: List) {
+ for (request in requests) {
+ for (cs in request.committedStatesEntities) {
+ session.persist(cs)
+ }
+ }
+ }
+
+ private fun findAlreadyCommitted(session: Session, states: List, references: List): Map {
+ val ids = (states + references).map { encodeStateRef(it) }.toSet()
+ val committedStates = mutableListOf()
+
+ for (idsBatch in ids.chunked(notaryConfig.maxInputStates)) {
+ @SuppressWarnings("unchecked")
+ val existing = session.createNamedQuery("CommittedState.select").setParameter("ids", idsBatch).resultList as List
+ committedStates.addAll(existing)
+ }
+
+ return committedStates.map {
+ val stateRef = decodeStateRef(it.id)
+ val consumingTxId = SecureHash.parse(it.consumingTxHash)
+ if (stateRef in references) {
+ stateRef to StateConsumptionDetails(hashOfTransactionId = consumingTxId, type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
+ } else {
+ stateRef to StateConsumptionDetails(consumingTxId.sha256())
+ }
+ }.toMap()
+ }
+
+ private fun withRetry(block: () -> Unit) {
+ var retryCount = 0
+ var backOff = notaryConfig.backOffBaseMs
+ while (retryCount < notaryConfig.maxDBTransactionRetryCount) {
+ try {
+ block()
+ break
+ } catch (e: SQLException) {
+ retryCount++
+ Thread.sleep(backOff)
+ backOff *= 2
+ }
+ }
+ }
+
+ private fun findAllConflicts(session: Session, requests: List): MutableMap {
+ val allInputs = requests.flatMap { it.states }
+ val references = requests.flatMap { it.references }
+ log.info("Processing notarization requests with ${allInputs.size} input states and ${references.size} references")
+
+ return findAlreadyCommitted(session, allInputs, references).toMutableMap()
+ }
+
+ private fun processRequest(request: CommitRequest, allConflicts: MutableMap, toCommit: MutableList): AsyncUniquenessProvider.Result {
+
+ val conflicts = (request.states + request.references).mapNotNull {
+ if (allConflicts.containsKey(it)) it to allConflicts[it]!!
+ else null
+ }.toMap()
+ val result = if (conflicts.isNotEmpty()) {
+ if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) {
+ AsyncUniquenessProvider.Result.Success
+ } else {
+ AsyncUniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts))
+ }
+ } else {
+ val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow)
+ if (outsideTimeWindowError == null) {
+ toCommit.add(request)
+ // Mark states as consumed to capture conflicting transactions in the same batch.
+ request.states.forEach {
+ allConflicts[it] = StateConsumptionDetails(request.txId.sha256())
+ }
+ AsyncUniquenessProvider.Result.Success
+ } else {
+ AsyncUniquenessProvider.Result.Failure(outsideTimeWindowError)
+ }
+ }
+ return result
+ }
+
+ private fun processRequests(requests: List) {
+ try {
+ // Note that there is an additional retry mechanism within the transaction itself.
+ withRetry {
+ database.transaction {
+ val em = session.entityManagerFactory.createEntityManager()
+ em.unwrap(Session::class.java).jdbcBatchSize = jdbcBatchSize
+
+ val toCommit = mutableListOf()
+ val allConflicts = findAllConflicts(session, requests)
+
+ val results = requests.map { request ->
+ processRequest(request, allConflicts, toCommit)
+ }
+ logRequests(requests)
+ commitRequests(session, toCommit)
+
+ for ((request, result) in requests.zip(results)) {
+ request.future.set(result)
+ }
+ }
+ }
+ } catch(e: Exception) {
+ log.warn("Error processing commit requests", e)
+ for (request in requests) {
+ respondWithError(request, e)
+ }
+ }
+ }
+
+ private fun respondWithError(request: CommitRequest, exception: Exception) {
+ if (exception is NotaryInternalException) {
+ request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error))
+ } else {
+ request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
+ }
+ }
+}
diff --git a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/Schema.kt b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/Schema.kt
new file mode 100644
index 0000000000..7c9a614099
--- /dev/null
+++ b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/Schema.kt
@@ -0,0 +1,17 @@
+package net.corda.notary.jpa
+
+import net.corda.core.schemas.MappedSchema
+
+object JPANotarySchema
+
+object JPANotarySchemaV1 : MappedSchema(
+ schemaFamily = JPANotarySchema.javaClass,
+ version = 1,
+ mappedTypes = listOf(
+ JPAUniquenessProvider.CommittedState::class.java,
+ JPAUniquenessProvider.Request::class.java
+ )
+) {
+ override val migrationResource: String?
+ get() = "notary-jpa.changelog-master"
+}
diff --git a/notary/jpa/src/main/resources/migration/notary-jpa.changelog-init.xml b/notary/jpa/src/main/resources/migration/notary-jpa.changelog-init.xml
new file mode 100644
index 0000000000..40a588ceda
--- /dev/null
+++ b/notary/jpa/src/main/resources/migration/notary-jpa.changelog-init.xml
@@ -0,0 +1,34 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/notary/jpa/src/main/resources/migration/notary-jpa.changelog-master.xml b/notary/jpa/src/main/resources/migration/notary-jpa.changelog-master.xml
new file mode 100644
index 0000000000..0203608b62
--- /dev/null
+++ b/notary/jpa/src/main/resources/migration/notary-jpa.changelog-master.xml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
diff --git a/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt b/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt
new file mode 100644
index 0000000000..068a2c416d
--- /dev/null
+++ b/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt
@@ -0,0 +1,102 @@
+package net.corda.notary.jpa
+
+import net.corda.core.crypto.DigitalSignature
+import net.corda.core.crypto.NullKeys
+import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.sha256
+import net.corda.core.flows.NotarisationRequestSignature
+import net.corda.core.flows.NotaryError
+import net.corda.core.identity.CordaX500Name
+import net.corda.core.internal.notary.NotaryInternalException
+import net.corda.node.internal.configureDatabase
+import net.corda.node.services.config.NotaryConfig
+import net.corda.node.services.schema.NodeSchemaService
+import net.corda.notary.jpa.JPAUniquenessProvider.Companion.decodeStateRef
+import net.corda.notary.jpa.JPAUniquenessProvider.Companion.encodeStateRef
+import net.corda.nodeapi.internal.persistence.CordaPersistence
+import net.corda.nodeapi.internal.persistence.DatabaseConfig
+import net.corda.testing.core.SerializationEnvironmentRule
+import net.corda.testing.core.TestIdentity
+import net.corda.testing.core.generateStateRef
+import net.corda.testing.internal.LogHelper
+import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
+import org.junit.After
+import org.junit.Before
+import org.junit.Rule
+import org.junit.Test
+import java.time.Clock
+import kotlin.test.assertEquals
+import kotlin.test.assertFailsWith
+
+class JPAUniquenessProviderTests {
+ @Rule
+ @JvmField
+ val testSerialization = SerializationEnvironmentRule(inheritable = true)
+ private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party
+ private val txID = SecureHash.randomSHA256()
+ private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0)
+ private val notaryConfig = NotaryConfig(validating=false, maxInputStates = 10)
+
+ private lateinit var database: CordaPersistence
+
+ @Before
+ fun setUp() {
+ LogHelper.setLevel(JPAUniquenessProvider::class)
+ database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), { null }, { null }, NodeSchemaService(extraSchemas = setOf(JPANotarySchemaV1)))
+ }
+
+ @After
+ fun tearDown() {
+ database.close()
+ LogHelper.reset(JPAUniquenessProvider::class)
+ }
+
+ @Test
+ fun `should commit a transaction with unused inputs without exception`() {
+ val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
+ val inputState = generateStateRef()
+
+ provider.commit(listOf(inputState), txID, identity, requestSignature)
+ }
+
+ @Test
+ fun `should report a conflict for a transaction with previously used inputs`() {
+ val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
+ val inputState = generateStateRef()
+
+ val inputs = listOf(inputState)
+ val firstTxId = txID
+ provider.commit(inputs, firstTxId, identity, requestSignature)
+
+ val secondTxId = SecureHash.randomSHA256()
+ val ex = assertFailsWith {
+ provider.commit(inputs, secondTxId, identity, requestSignature)
+ }
+ val error = ex.error as NotaryError.Conflict
+
+ val conflictCause = error.consumedStates[inputState]!!
+ assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
+ }
+
+ @Test
+ fun `serializes and deserializes state ref`() {
+ val stateRef = generateStateRef()
+ assertEquals(stateRef, decodeStateRef(encodeStateRef(stateRef)))
+ }
+
+ @Test
+ fun `all conflicts are found with batching`() {
+ val nrStates = notaryConfig.maxInputStates + notaryConfig.maxInputStates/2
+ val stateRefs = (1..nrStates).map { generateStateRef() }
+ println(stateRefs.size)
+ val firstTxId = SecureHash.randomSHA256()
+ val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
+ provider.commit(stateRefs, firstTxId, identity, requestSignature)
+ val secondTxId = SecureHash.randomSHA256()
+ val ex = assertFailsWith {
+ provider.commit(stateRefs, secondTxId, identity, requestSignature)
+ }
+ val error = ex.error as NotaryError.Conflict
+ assertEquals(nrStates, error.consumedStates.size)
+ }
+}
diff --git a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/flows/DummyIssueAndMove.kt b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/flows/DummyIssueAndMove.kt
index 5ea916d51e..a14d5c0768 100644
--- a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/flows/DummyIssueAndMove.kt
+++ b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/flows/DummyIssueAndMove.kt
@@ -1,6 +1,7 @@
package net.corda.notarydemo.flows
import co.paralleluniverse.fibers.Suspendable
+import net.corda.core.contracts.BelongsToContract
import net.corda.core.contracts.CommandData
import net.corda.core.contracts.Contract
import net.corda.core.contracts.ContractState
@@ -24,6 +25,7 @@ class DummyIssueAndMove(private val notary: Party, private val counterpartyNode:
data class DummyCommand(val dummy: Int = 0) : CommandData
+ @BelongsToContract(DoNothingContract::class)
data class State(override val participants: List, val discriminator: Int) : ContractState
@Suspendable
diff --git a/settings.gradle b/settings.gradle
index 9f8e9be8b6..64e2b8b955 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -82,6 +82,7 @@ include 'node:dist'
include 'tools:notary-healthcheck:contract'
include 'tools:notary-healthcheck:cordapp'
include 'tools:notary-healthcheck:client'
+include 'notary:jpa'
include 'notary:mysql'
apply from: 'buildCacheSettings.gradle'