ENT-2769 Handle reference states in the JPAUniquenessProvider (#1586)

* Handle reference states in the JPAUniquenessProvider

* Remove unused import
This commit is contained in:
Thomas Schroeter 2018-11-22 19:25:13 +00:00 committed by GitHub
parent 89ea638d05
commit df2c313f23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 139 additions and 7 deletions

View File

@ -57,7 +57,10 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet()
Pair(NodeCoreV1, SchemaOptions())) Pair(NodeCoreV1, SchemaOptions()))
fun internalSchemas() = requiredSchemas.keys + extraSchemas.filter { schema -> // when mapped schemas from the finance module are present, they are considered as internal ones fun internalSchemas() = requiredSchemas.keys + extraSchemas.filter { schema -> // when mapped schemas from the finance module are present, they are considered as internal ones
schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" || schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" } schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" ||
schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" ||
schema::class.qualifiedName == "net.corda.notary.jpa.JPANotarySchemaV1"
}
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas + extraSchemas.associateBy({ it }, { SchemaOptions() }) override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas + extraSchemas.associateBy({ it }, { SchemaOptions() })

View File

@ -89,6 +89,13 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
@Column(name = "consuming_transaction_id", nullable = false, length = 64) @Column(name = "consuming_transaction_id", nullable = false, length = 64)
val consumingTxHash: String) val consumingTxHash: String)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}jpa_notary_committed_transactions")
class CommittedTransaction(
@Id
val transactionId: String
)
private val requestQueue = LinkedBlockingQueue<CommitRequest>(requestQueueSize) private val requestQueue = LinkedBlockingQueue<CommitRequest>(requestQueueSize)
// TODO: Collect metrics. // TODO: Collect metrics.
@ -168,6 +175,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
for (cs in request.committedStatesEntities) { for (cs in request.committedStatesEntities) {
session.persist(cs) session.persist(cs)
} }
session.persist(CommittedTransaction(request.txId.toString()))
} }
} }
@ -185,7 +193,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
val stateRef = decodeStateRef(it.id) val stateRef = decodeStateRef(it.id)
val consumingTxId = SecureHash.parse(it.consumingTxHash) val consumingTxId = SecureHash.parse(it.consumingTxHash)
if (stateRef in references) { if (stateRef in references) {
stateRef to StateConsumptionDetails(hashOfTransactionId = consumingTxId, type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE) stateRef to StateConsumptionDetails(consumingTxId.sha256(), type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
} else { } else {
stateRef to StateConsumptionDetails(consumingTxId.sha256()) stateRef to StateConsumptionDetails(consumingTxId.sha256())
} }
@ -215,17 +223,22 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
return findAlreadyCommitted(session, allInputs, references).toMutableMap() return findAlreadyCommitted(session, allInputs, references).toMutableMap()
} }
private fun processRequest(request: CommitRequest, allConflicts: MutableMap<StateRef, StateConsumptionDetails>, toCommit: MutableList<CommitRequest>): UniquenessProvider.Result { private fun processRequest(session: Session, request: CommitRequest, allConflicts: MutableMap<StateRef, StateConsumptionDetails>, toCommit: MutableList<CommitRequest>): UniquenessProvider.Result {
val conflicts = (request.states + request.references).mapNotNull { val conflicts = (request.states + request.references).mapNotNull {
if (allConflicts.containsKey(it)) it to allConflicts[it]!! if (allConflicts.containsKey(it)) it to allConflicts[it]!!
else null else null
}.toMap() }.toMap()
val result = if (conflicts.isNotEmpty()) { val result = if (conflicts.isNotEmpty()) {
if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) { if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) {
UniquenessProvider.Result.Success UniquenessProvider.Result.Success
} else { } else {
UniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts)) if (request.states.isEmpty() && isPreviouslyNotarised(session, request.txId)) {
UniquenessProvider.Result.Success
} else {
UniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts))
}
} }
} else { } else {
val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow) val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow)
@ -237,12 +250,20 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
} }
UniquenessProvider.Result.Success UniquenessProvider.Result.Success
} else { } else {
UniquenessProvider.Result.Failure(outsideTimeWindowError) if (request.states.isEmpty() && isPreviouslyNotarised(session, request.txId)) {
UniquenessProvider.Result.Success
} else {
UniquenessProvider.Result.Failure(outsideTimeWindowError)
}
} }
} }
return result return result
} }
private fun isPreviouslyNotarised(session: Session, txId: SecureHash): Boolean {
return session.find(CommittedTransaction::class.java, txId.toString()) != null
}
private fun processRequests(requests: List<CommitRequest>) { private fun processRequests(requests: List<CommitRequest>) {
try { try {
// Note that there is an additional retry mechanism within the transaction itself. // Note that there is an additional retry mechanism within the transaction itself.
@ -255,7 +276,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
val allConflicts = findAllConflicts(session, requests) val allConflicts = findAllConflicts(session, requests)
val results = requests.map { request -> val results = requests.map { request ->
processRequest(request, allConflicts, toCommit) processRequest(session, request, allConflicts, toCommit)
} }
logRequests(requests) logRequests(requests)
commitRequests(session, toCommit) commitRequests(session, toCommit)

View File

@ -9,7 +9,8 @@ object JPANotarySchemaV1 : MappedSchema(
version = 1, version = 1,
mappedTypes = listOf( mappedTypes = listOf(
JPAUniquenessProvider.CommittedState::class.java, JPAUniquenessProvider.CommittedState::class.java,
JPAUniquenessProvider.Request::class.java JPAUniquenessProvider.Request::class.java,
JPAUniquenessProvider.CommittedTransaction::class.java
) )
) { ) {
override val migrationResource: String? override val migrationResource: String?

View File

@ -0,0 +1,14 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="create-node-jpa-notary-committed-transactions">
<createTable tableName="node_jpa_notary_committed_transactions">
<column name="transaction_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="transaction_id" constraintName="node_jpa_notary_committed_transactions_pkey" tableName="node_jpa_notary_committed_transaction"/>
</changeSet>
</databaseChangeLog>

View File

@ -4,5 +4,6 @@
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"> xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<include file="migration/notary-jpa.changelog-init.xml"/> <include file="migration/notary-jpa.changelog-init.xml"/>
<include file="migration/notary-jpa.changelog-create-committed-transactions-table.xml"/>
</databaseChangeLog> </databaseChangeLog>

View File

@ -1,13 +1,16 @@
package net.corda.notary.jpa package net.corda.notary.jpa
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.NullKeys import net.corda.core.crypto.NullKeys
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256 import net.corda.core.crypto.sha256
import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError import net.corda.core.flows.NotaryError
import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.utilities.minutes
import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.schema.NodeSchemaService
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
@ -19,6 +22,7 @@ import net.corda.testing.core.generateStateRef
import net.corda.testing.internal.LogHelper import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.configureDatabase import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.TestClock
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Rule import org.junit.Rule
@ -99,4 +103,92 @@ class JPAUniquenessProviderTests {
val error = (secondResult as UniquenessProvider.Result.Failure).error as NotaryError.Conflict val error = (secondResult as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
assertEquals(nrStates, error.consumedStates.size) assertEquals(nrStates, error.consumedStates.size)
} }
@Test
fun `handles reference states`() {
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
val inputState1 = generateStateRef()
val inputState2 = generateStateRef()
val firstTxId = SecureHash.randomSHA256()
val secondTxId = SecureHash.randomSHA256()
// Conflict free transaction goes through.
val result1 = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, references = listOf(inputState2)).get()
assertEquals(UniquenessProvider.Result.Success, result1)
// Referencing a spent state results in a conflict.
val result2 = provider.commit(listOf(inputState2), secondTxId, identity, requestSignature, references = listOf(inputState1)).get()
val error = (result2 as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
val conflictCause = error.consumedStates[inputState1]!!
assertEquals(firstTxId.sha256(), conflictCause.hashOfTransactionId)
assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type)
// Re-notarisation works.
val result3 = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, references = listOf(inputState2)).get()
assertEquals(UniquenessProvider.Result.Success, result3)
}
@Test
fun `handles transaction with reference states only`() {
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
val inputState1 = generateStateRef()
val firstTxId = SecureHash.randomSHA256()
val secondTxId = SecureHash.randomSHA256()
val thirdTxId = SecureHash.randomSHA256()
// Conflict free transaction goes through.
val result1 = provider.commit(emptyList(), firstTxId, identity, requestSignature, references = listOf(inputState1)).get()
assertEquals(UniquenessProvider.Result.Success, result1)
// Commit state 1.
val result2 = provider.commit(listOf(inputState1), secondTxId, identity, requestSignature).get()
assertEquals(UniquenessProvider.Result.Success, result2)
// Re-notarisation works.
val result3 = provider.commit(emptyList(), firstTxId, identity, requestSignature, references = listOf(inputState1)).get()
assertEquals(UniquenessProvider.Result.Success, result3)
// Transaction referencing the spent sate fails.
val result4 = provider.commit(emptyList(), thirdTxId, identity, requestSignature, references = listOf(inputState1)).get()
val error = (result4 as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
val conflictCause = error.consumedStates[inputState1]!!
assertEquals(conflictCause.hashOfTransactionId, secondTxId.sha256())
assertEquals(conflictCause.type, StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
}
@Test
fun `rejects transaction with invalid time window`() {
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
val inputState1 = generateStateRef()
val firstTxId = SecureHash.randomSHA256()
val timeWindow = TimeWindow.fromOnly(Clock.systemUTC().instant().plus(30.minutes))
val result = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get()
val error = (result as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid
assertEquals(timeWindow, error.txTimeWindow)
}
@Test
fun `handles transaction with valid time window`() {
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
val inputState1 = generateStateRef()
val firstTxId = SecureHash.randomSHA256()
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
val result = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get()
assertEquals(UniquenessProvider.Result.Success, result)
}
@Test
fun `handles transaction with valid time window without inputs`() {
val testClock = TestClock(Clock.systemUTC())
val provider = JPAUniquenessProvider(testClock, database, notaryConfig)
val firstTxId = SecureHash.randomSHA256()
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
val result = provider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
assertEquals(UniquenessProvider.Result.Success, result)
// Re-notarisation works outside the specified time window.
testClock.advanceBy(90.minutes)
val result2 = provider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
assertEquals(UniquenessProvider.Result.Success, result2)
}
} }