mirror of
https://github.com/corda/corda.git
synced 2025-03-15 00:36:49 +00:00
Thomas/ent 2510 single column index (#1431)
This commit is contained in:
parent
53473aedba
commit
70e24b79fb
2
.idea/compiler.xml
generated
2
.idea/compiler.xml
generated
@ -187,6 +187,8 @@
|
||||
<module name="jfx_test" target="1.8" />
|
||||
<module name="jmeter_main" target="1.8" />
|
||||
<module name="jmeter_test" target="1.8" />
|
||||
<module name="jpa_main" target="1.8" />
|
||||
<module name="jpa_test" target="1.8" />
|
||||
<module name="kryo-hook_main" target="1.8" />
|
||||
<module name="kryo-hook_test" target="1.8" />
|
||||
<module name="launcher_main" target="1.8" />
|
||||
|
@ -1,12 +1,15 @@
|
||||
package net.corda.core.internal.notary
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowAsyncOperation
|
||||
import net.corda.core.internal.executeAsync
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
|
||||
@ -16,6 +19,25 @@ abstract class AsyncCFTNotaryService : TrustedAuthorityNotaryService() {
|
||||
/** A uniqueness provider that supports asynchronous commits. */
|
||||
protected abstract val asyncUniquenessProvider: AsyncUniquenessProvider
|
||||
|
||||
/**
|
||||
* @throws NotaryInternalException if any of the states have been consumed by a different transaction.
|
||||
*/
|
||||
@Suspendable
|
||||
override fun commitInputStates(
|
||||
inputs: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
caller: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
) {
|
||||
// TODO: Log the request here. Benchmarking shows that logging is expensive and we might get better performance
|
||||
// when we concurrently log requests here as part of the flows, instead of logging sequentially in the
|
||||
// `UniquenessProvider`.
|
||||
val result = FlowLogic.currentTopLevel!!.executeAsync(AsyncCFTNotaryService.CommitOperation(this, inputs, txId, caller, requestSignature, timeWindow, references))
|
||||
if (result is Result.Failure) throw NotaryInternalException(result.error)
|
||||
}
|
||||
|
||||
/**
|
||||
* Commits the provided input states asynchronously.
|
||||
*
|
||||
|
@ -18,7 +18,7 @@ interface AsyncUniquenessProvider : UniquenessProvider {
|
||||
|
||||
/** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>) {
|
||||
val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow,references).get()
|
||||
val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow, references).get()
|
||||
if (result is Result.Failure) {
|
||||
throw NotaryInternalException(result.error)
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.IdempotentFlow
|
||||
import net.corda.core.internal.executeAsync
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
|
||||
import net.corda.core.utilities.unwrap
|
||||
@ -19,7 +20,7 @@ import net.corda.core.utilities.unwrap
|
||||
* Additional transaction validation logic can be added when implementing [validateRequest].
|
||||
*/
|
||||
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
|
||||
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic<Void?>() {
|
||||
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic<Void?>(), IdempotentFlow {
|
||||
companion object {
|
||||
// TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder.
|
||||
private const val maxAllowedInputsAndReferences = 10_000
|
||||
@ -36,12 +37,7 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
|
||||
val parts = validateRequest(requestPayload)
|
||||
txId = parts.id
|
||||
checkNotary(parts.notary)
|
||||
if (service is AsyncCFTNotaryService) {
|
||||
val result = executeAsync(AsyncCFTNotaryService.CommitOperation(service, parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp, parts.references))
|
||||
if (result is Result.Failure) throw NotaryInternalException(result.error)
|
||||
} else {
|
||||
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp, parts.references)
|
||||
}
|
||||
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp, parts.references)
|
||||
signTransactionAndSendResponse(txId)
|
||||
} catch (e: NotaryInternalException) {
|
||||
throw NotaryException(e.error, txId)
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.core.internal.notary
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.*
|
||||
@ -22,11 +23,12 @@ abstract class TrustedAuthorityNotaryService : NotaryService() {
|
||||
protected abstract val uniquenessProvider: UniquenessProvider
|
||||
|
||||
/**
|
||||
* A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
|
||||
* @throws NotaryException if any of the states have been consumed by a different transaction. Note that
|
||||
* this method does not throw an exception when input states are present multiple times within the transaction.
|
||||
*/
|
||||
@JvmOverloads
|
||||
fun commitInputStates(
|
||||
@Suspendable
|
||||
open fun commitInputStates(
|
||||
inputs: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
caller: Party,
|
||||
|
@ -138,7 +138,12 @@ data class NotaryConfig(val validating: Boolean,
|
||||
val custom: Boolean = false,
|
||||
val mysql: MySQLConfiguration? = null,
|
||||
val serviceLegalName: CordaX500Name? = null,
|
||||
val className: String = "net.corda.node.services.transactions.SimpleNotaryService"
|
||||
val className: String = "net.corda.node.services.transactions.SimpleNotaryService",
|
||||
val batchSize: Int = 128,
|
||||
val batchTimeoutMs: Long = 1L,
|
||||
val maxInputStates: Int = 2000,
|
||||
val maxDBTransactionRetryCount: Int = 10,
|
||||
val backOffBaseMs: Long = 20L
|
||||
) {
|
||||
init {
|
||||
require(raft == null || bftSMaRt == null || !custom || mysql == null) {
|
||||
|
@ -17,7 +17,6 @@ import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.persistence.RunOnceService
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
|
||||
import net.corda.node.services.vault.VaultSchemaV1
|
||||
|
||||
|
@ -37,4 +37,4 @@ object NodeNotarySchemaV1 : MappedSchema(schemaFamily = NodeNotarySchema.javaCla
|
||||
PersistentUniquenessProvider.CommittedState::class.java
|
||||
)) {
|
||||
override val migrationResource = "node-notary.changelog-master"
|
||||
}
|
||||
}
|
||||
|
@ -55,4 +55,4 @@
|
||||
<addPrimaryKey columnNames="id" constraintName="node_notary_request_log_pkey"
|
||||
tableName="node_notary_request_log"/>
|
||||
</changeSet>
|
||||
</databaseChangeLog>
|
||||
</databaseChangeLog>
|
||||
|
@ -5,8 +5,6 @@
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||
|
||||
<include file="migration/node-notary.changelog-init.xml"/>
|
||||
|
||||
<include file="migration/node-notary.changelog-v1.xml"/>
|
||||
<include file="migration/node-notary.changelog-pkey.xml"/>
|
||||
|
||||
</databaseChangeLog>
|
||||
|
@ -59,19 +59,21 @@ class PersistentUniquenessProviderTests {
|
||||
@Test
|
||||
fun `should report a conflict for a transaction with previously used inputs`() {
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||
val inputState = generateStateRef()
|
||||
val inputState = generateStateRef()
|
||||
|
||||
val inputs = listOf(inputState)
|
||||
val firstTxId = txID
|
||||
provider.commit(inputs, firstTxId, identity, requestSignature)
|
||||
val inputs = listOf(inputState)
|
||||
val firstTxId = txID
|
||||
provider.commit(inputs, firstTxId, identity, requestSignature)
|
||||
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
val ex = assertFailsWith<NotaryInternalException> {
|
||||
provider.commit(inputs, secondTxId, identity, requestSignature)
|
||||
}
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
provider.commit(inputs, firstTxId, identity, requestSignature)
|
||||
|
||||
val conflictCause = error.consumedStates[inputState]!!
|
||||
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
val ex = assertFailsWith<NotaryInternalException> {
|
||||
provider.commit(inputs, secondTxId, identity, requestSignature)
|
||||
}
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
|
||||
val conflictCause = error.consumedStates[inputState]!!
|
||||
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
|
||||
}
|
||||
}
|
||||
|
27
notary/jpa/build.gradle
Normal file
27
notary/jpa/build.gradle
Normal file
@ -0,0 +1,27 @@
|
||||
apply plugin: 'kotlin'
|
||||
apply plugin: 'kotlin-jpa'
|
||||
apply plugin: 'idea'
|
||||
apply plugin: 'net.corda.plugins.cordapp'
|
||||
apply plugin: 'net.corda.plugins.publish-utils'
|
||||
apply plugin: 'net.corda.plugins.quasar-utils'
|
||||
|
||||
dependencies {
|
||||
cordaCompile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
|
||||
|
||||
// Corda integration dependencies
|
||||
cordaCompile project(':node')
|
||||
|
||||
testCompile "junit:junit:$junit_version"
|
||||
testCompile project(':node-driver')
|
||||
}
|
||||
|
||||
idea {
|
||||
module {
|
||||
downloadJavadoc = true // defaults to false
|
||||
downloadSources = true
|
||||
}
|
||||
}
|
||||
|
||||
publish {
|
||||
name 'corda-notary-jpa'
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package net.corda.notary.jpa
|
||||
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.notary.AsyncCFTNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.transactions.NonValidatingNotaryFlow
|
||||
import net.corda.node.services.transactions.ValidatingNotaryFlow
|
||||
import java.security.PublicKey
|
||||
|
||||
/** Notary service backed by a replicated MySQL database. */
|
||||
class JPANotaryService(
|
||||
override val services: ServiceHubInternal,
|
||||
override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() {
|
||||
|
||||
private val notaryConfig = services.configuration.notary
|
||||
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
|
||||
|
||||
override val asyncUniquenessProvider = with(services) {
|
||||
JPAUniquenessProvider(services.clock, services.database, notaryConfig)
|
||||
}
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow {
|
||||
return if (notaryConfig.validating) {
|
||||
ValidatingNotaryFlow(otherPartySession, this)
|
||||
} else NonValidatingNotaryFlow(otherPartySession, this)
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
asyncUniquenessProvider.stop()
|
||||
}
|
||||
}
|
@ -0,0 +1,281 @@
|
||||
package net.corda.notary.jpa
|
||||
|
||||
import com.google.common.collect.Queues
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.*
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.services.config.NotaryConfig
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import net.corda.serialization.internal.CordaSerializationEncoding
|
||||
import org.hibernate.Session
|
||||
import java.sql.SQLException
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import javax.persistence.*
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
/** A JPA backed Uniqueness provider */
|
||||
@ThreadSafe
|
||||
class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val notaryConfig: NotaryConfig) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
|
||||
|
||||
// TODO: test vs. MySQLUniquenessProvider
|
||||
|
||||
// This is the prefix of the ID in the request log table, to allow running multiple instances that access the
|
||||
// same table.
|
||||
val instanceId = UUID.randomUUID().toString()
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}jpa_notary_request_log")
|
||||
@CordaSerializable
|
||||
class Request(
|
||||
@Id
|
||||
@Column(nullable = true, length=76)
|
||||
var id: String? = null,
|
||||
|
||||
@Column(name = "consuming_transaction_id", nullable = true, length = 64)
|
||||
val consumingTxHash: String?,
|
||||
|
||||
@Column(name = "requesting_party_name", nullable = true, length = 255)
|
||||
var partyName: String?,
|
||||
|
||||
@Lob
|
||||
@Column(name = "request_signature", nullable = false)
|
||||
val requestSignature: ByteArray,
|
||||
|
||||
@Column(name = "request_timestamp", nullable = false)
|
||||
var requestDate: Instant
|
||||
)
|
||||
|
||||
private data class CommitRequest(
|
||||
val states: List<StateRef>,
|
||||
val txId: SecureHash,
|
||||
val callerIdentity: Party,
|
||||
val requestSignature: NotarisationRequestSignature,
|
||||
val timeWindow: TimeWindow?,
|
||||
val references: List<StateRef>,
|
||||
val future: OpenFuture<AsyncUniquenessProvider.Result>,
|
||||
val requestEntity: Request,
|
||||
val committedStatesEntities: List<CommittedState>)
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}jpa_notary_committed_states")
|
||||
@NamedQuery(name = "CommittedState.select", query="SELECT c from JPAUniquenessProvider\$CommittedState c WHERE c.id in :ids")
|
||||
class CommittedState(
|
||||
@Id
|
||||
@Column(name = "state_ref", length = 73)
|
||||
val id: String,
|
||||
@Column(name = "consuming_transaction_id", nullable = false, length = 64)
|
||||
val consumingTxHash: String)
|
||||
|
||||
private val requestQueue = LinkedBlockingQueue<CommitRequest>(requestQueueSize)
|
||||
|
||||
// TODO: Collect metrics.
|
||||
|
||||
/** A requestEntitiy processor thread. */
|
||||
private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) {
|
||||
try {
|
||||
val buffer = LinkedList<CommitRequest>()
|
||||
while (!Thread.interrupted()) {
|
||||
val drainedSize = Queues.drain(requestQueue, buffer, notaryConfig.batchSize, notaryConfig.batchTimeoutMs, TimeUnit.MILLISECONDS)
|
||||
if (drainedSize == 0) continue
|
||||
processRequests(buffer)
|
||||
buffer.clear()
|
||||
}
|
||||
} catch (e: InterruptedException) {
|
||||
}
|
||||
log.debug { "Shutting down with ${requestQueue.size} in-flight requests unprocessed." }
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
processorThread.interrupt()
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val requestQueueSize = 100_000
|
||||
private const val jdbcBatchSize = 100_000
|
||||
private val log = contextLogger()
|
||||
|
||||
fun encodeStateRef(s: StateRef): String {
|
||||
return s.txhash.toString() + ":" + s.index.toString(16)
|
||||
}
|
||||
|
||||
fun decodeStateRef(s: String): StateRef {
|
||||
return StateRef(txhash = SecureHash.parse(s.take(64)), index = s.substring(65).toInt(16))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates and adds a [CommitRequest] to the requestEntitiy queue. If the requestEntitiy queue is full, this method will block
|
||||
* until space is available.
|
||||
*
|
||||
* Returns a future that will complete once the requestEntitiy is processed, containing the commit [Result].
|
||||
*/
|
||||
override fun commitAsync(
|
||||
states: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
callerIdentity: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
): CordaFuture<AsyncUniquenessProvider.Result> {
|
||||
val future = openFuture<AsyncUniquenessProvider.Result>()
|
||||
val requestEntities = Request(consumingTxHash = txId.toString(),
|
||||
partyName = callerIdentity.name.toString(),
|
||||
requestSignature = requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(CordaSerializationEncoding.SNAPPY)).bytes,
|
||||
requestDate = clock.instant())
|
||||
val stateEntities = states.map { CommittedState(encodeStateRef(it), txId.toString()) }
|
||||
val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future, requestEntities, stateEntities)
|
||||
requestQueue.put(request)
|
||||
return future
|
||||
}
|
||||
|
||||
// Safe up to 100k requests per second.
|
||||
private var nextRequestId = System.currentTimeMillis() * 100
|
||||
|
||||
private fun logRequests(requests: List<CommitRequest>) {
|
||||
database.transaction {
|
||||
for (request in requests) {
|
||||
request.requestEntity.id = instanceId + (nextRequestId++).toString(16)
|
||||
session.persist(request.requestEntity)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun commitRequests(session: Session, requests: List<CommitRequest>) {
|
||||
for (request in requests) {
|
||||
for (cs in request.committedStatesEntities) {
|
||||
session.persist(cs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun findAlreadyCommitted(session: Session, states: List<StateRef>, references: List<StateRef>): Map<StateRef, StateConsumptionDetails> {
|
||||
val ids = (states + references).map { encodeStateRef(it) }.toSet()
|
||||
val committedStates = mutableListOf<CommittedState>()
|
||||
|
||||
for (idsBatch in ids.chunked(notaryConfig.maxInputStates)) {
|
||||
@SuppressWarnings("unchecked")
|
||||
val existing = session.createNamedQuery("CommittedState.select").setParameter("ids", idsBatch).resultList as List<CommittedState>
|
||||
committedStates.addAll(existing)
|
||||
}
|
||||
|
||||
return committedStates.map {
|
||||
val stateRef = decodeStateRef(it.id)
|
||||
val consumingTxId = SecureHash.parse(it.consumingTxHash)
|
||||
if (stateRef in references) {
|
||||
stateRef to StateConsumptionDetails(hashOfTransactionId = consumingTxId, type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
|
||||
} else {
|
||||
stateRef to StateConsumptionDetails(consumingTxId.sha256())
|
||||
}
|
||||
}.toMap()
|
||||
}
|
||||
|
||||
private fun withRetry(block: () -> Unit) {
|
||||
var retryCount = 0
|
||||
var backOff = notaryConfig.backOffBaseMs
|
||||
while (retryCount < notaryConfig.maxDBTransactionRetryCount) {
|
||||
try {
|
||||
block()
|
||||
break
|
||||
} catch (e: SQLException) {
|
||||
retryCount++
|
||||
Thread.sleep(backOff)
|
||||
backOff *= 2
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun findAllConflicts(session: Session, requests: List<CommitRequest>): MutableMap<StateRef, StateConsumptionDetails> {
|
||||
val allInputs = requests.flatMap { it.states }
|
||||
val references = requests.flatMap { it.references }
|
||||
log.info("Processing notarization requests with ${allInputs.size} input states and ${references.size} references")
|
||||
|
||||
return findAlreadyCommitted(session, allInputs, references).toMutableMap()
|
||||
}
|
||||
|
||||
private fun processRequest(request: CommitRequest, allConflicts: MutableMap<StateRef, StateConsumptionDetails>, toCommit: MutableList<CommitRequest>): AsyncUniquenessProvider.Result {
|
||||
|
||||
val conflicts = (request.states + request.references).mapNotNull {
|
||||
if (allConflicts.containsKey(it)) it to allConflicts[it]!!
|
||||
else null
|
||||
}.toMap()
|
||||
val result = if (conflicts.isNotEmpty()) {
|
||||
if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) {
|
||||
AsyncUniquenessProvider.Result.Success
|
||||
} else {
|
||||
AsyncUniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts))
|
||||
}
|
||||
} else {
|
||||
val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow)
|
||||
if (outsideTimeWindowError == null) {
|
||||
toCommit.add(request)
|
||||
// Mark states as consumed to capture conflicting transactions in the same batch.
|
||||
request.states.forEach {
|
||||
allConflicts[it] = StateConsumptionDetails(request.txId.sha256())
|
||||
}
|
||||
AsyncUniquenessProvider.Result.Success
|
||||
} else {
|
||||
AsyncUniquenessProvider.Result.Failure(outsideTimeWindowError)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
private fun processRequests(requests: List<CommitRequest>) {
|
||||
try {
|
||||
// Note that there is an additional retry mechanism within the transaction itself.
|
||||
withRetry {
|
||||
database.transaction {
|
||||
val em = session.entityManagerFactory.createEntityManager()
|
||||
em.unwrap(Session::class.java).jdbcBatchSize = jdbcBatchSize
|
||||
|
||||
val toCommit = mutableListOf<CommitRequest>()
|
||||
val allConflicts = findAllConflicts(session, requests)
|
||||
|
||||
val results = requests.map { request ->
|
||||
processRequest(request, allConflicts, toCommit)
|
||||
}
|
||||
logRequests(requests)
|
||||
commitRequests(session, toCommit)
|
||||
|
||||
for ((request, result) in requests.zip(results)) {
|
||||
request.future.set(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
log.warn("Error processing commit requests", e)
|
||||
for (request in requests) {
|
||||
respondWithError(request, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun respondWithError(request: CommitRequest, exception: Exception) {
|
||||
if (exception is NotaryInternalException) {
|
||||
request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error))
|
||||
} else {
|
||||
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
|
||||
}
|
||||
}
|
||||
}
|
17
notary/jpa/src/main/kotlin/net/corda/notary/jpa/Schema.kt
Normal file
17
notary/jpa/src/main/kotlin/net/corda/notary/jpa/Schema.kt
Normal file
@ -0,0 +1,17 @@
|
||||
package net.corda.notary.jpa
|
||||
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
|
||||
object JPANotarySchema
|
||||
|
||||
object JPANotarySchemaV1 : MappedSchema(
|
||||
schemaFamily = JPANotarySchema.javaClass,
|
||||
version = 1,
|
||||
mappedTypes = listOf(
|
||||
JPAUniquenessProvider.CommittedState::class.java,
|
||||
JPAUniquenessProvider.Request::class.java
|
||||
)
|
||||
) {
|
||||
override val migrationResource: String?
|
||||
get() = "notary-jpa.changelog-master"
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
|
||||
logicalFilePath="migration/node-services.changelog-init.xml">
|
||||
<changeSet author="R3.Corda" id="create-node-jpa-notary-committed-states">
|
||||
<createTable tableName="node_jpa_notary_committed_states">
|
||||
<column name="state_ref" type="NVARCHAR(73)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="consuming_transaction_id" type="NVARCHAR(64)"/>
|
||||
</createTable>
|
||||
<addPrimaryKey columnNames="state_ref" constraintName="node_jpa_notary_committed_states_pkey" tableName="node_jpa_notary_committed_states"/>
|
||||
<createTable tableName="node_jpa_notary_request_log">
|
||||
<column name="id" type="NVARCHAR(76)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="consuming_transaction_id" type="NVARCHAR(64)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="requesting_party_name" type="NVARCHAR(255)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="request_timestamp" type="TIMESTAMP">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="request_signature" type="BLOB">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
</createTable>
|
||||
<addPrimaryKey columnNames="id" constraintName="node_jpa_notary_request_log_pkey" tableName="node_jpa_notary_request_log"/>
|
||||
</changeSet>
|
||||
</databaseChangeLog>
|
@ -0,0 +1,9 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||
|
||||
<include file="migration/node-notary-jpa.changelog-init.xml"/>
|
||||
|
||||
</databaseChangeLog>
|
@ -0,0 +1,102 @@
|
||||
package net.corda.notary.jpa
|
||||
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.NullKeys
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.config.NotaryConfig
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.decodeStateRef
|
||||
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.encodeStateRef
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.core.generateStateRef
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.time.Clock
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
class JPAUniquenessProviderTests {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(inheritable = true)
|
||||
private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party
|
||||
private val txID = SecureHash.randomSHA256()
|
||||
private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0)
|
||||
private val notaryConfig = NotaryConfig(validating=false, maxInputStates = 10)
|
||||
|
||||
private lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(JPAUniquenessProvider::class)
|
||||
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), { null }, { null }, NodeSchemaService(extraSchemas = setOf(JPANotarySchemaV1)))
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
database.close()
|
||||
LogHelper.reset(JPAUniquenessProvider::class)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should commit a transaction with unused inputs without exception`() {
|
||||
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
|
||||
val inputState = generateStateRef()
|
||||
|
||||
provider.commit(listOf(inputState), txID, identity, requestSignature)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should report a conflict for a transaction with previously used inputs`() {
|
||||
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
|
||||
val inputState = generateStateRef()
|
||||
|
||||
val inputs = listOf(inputState)
|
||||
val firstTxId = txID
|
||||
provider.commit(inputs, firstTxId, identity, requestSignature)
|
||||
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
val ex = assertFailsWith<NotaryInternalException> {
|
||||
provider.commit(inputs, secondTxId, identity, requestSignature)
|
||||
}
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
|
||||
val conflictCause = error.consumedStates[inputState]!!
|
||||
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `serializes and deserializes state ref`() {
|
||||
val stateRef = generateStateRef()
|
||||
assertEquals(stateRef, decodeStateRef(encodeStateRef(stateRef)))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `all conflicts are found with batching`() {
|
||||
val nrStates = notaryConfig.maxInputStates + notaryConfig.maxInputStates/2
|
||||
val stateRefs = (1..nrStates).map { generateStateRef() }
|
||||
println(stateRefs.size)
|
||||
val firstTxId = SecureHash.randomSHA256()
|
||||
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
|
||||
provider.commit(stateRefs, firstTxId, identity, requestSignature)
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
val ex = assertFailsWith<NotaryInternalException> {
|
||||
provider.commit(stateRefs, secondTxId, identity, requestSignature)
|
||||
}
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
assertEquals(nrStates, error.consumedStates.size)
|
||||
}
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package net.corda.notarydemo.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.BelongsToContract
|
||||
import net.corda.core.contracts.CommandData
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.ContractState
|
||||
@ -24,6 +25,7 @@ class DummyIssueAndMove(private val notary: Party, private val counterpartyNode:
|
||||
|
||||
data class DummyCommand(val dummy: Int = 0) : CommandData
|
||||
|
||||
@BelongsToContract(DoNothingContract::class)
|
||||
data class State(override val participants: List<AbstractParty>, val discriminator: Int) : ContractState
|
||||
|
||||
@Suspendable
|
||||
|
@ -82,6 +82,7 @@ include 'node:dist'
|
||||
include 'tools:notary-healthcheck:contract'
|
||||
include 'tools:notary-healthcheck:cordapp'
|
||||
include 'tools:notary-healthcheck:client'
|
||||
include 'notary:jpa'
|
||||
include 'notary:mysql'
|
||||
|
||||
apply from: 'buildCacheSettings.gradle'
|
||||
|
Loading…
x
Reference in New Issue
Block a user