mirror of
https://github.com/corda/corda.git
synced 2025-01-31 08:25:50 +00:00
[ENT-2769] Handle reference states in the PersistentUniquenessProvider (#4275)
* Handle reference states in the PresistentUniquenessProvider * Complete test * Re-notarise outside time window without input states * Address comments * Add missing column name
This commit is contained in:
parent
e14421b7b4
commit
8f15cfaa28
@ -56,7 +56,10 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet()
|
|||||||
Pair(NodeCoreV1, SchemaOptions()))
|
Pair(NodeCoreV1, SchemaOptions()))
|
||||||
|
|
||||||
fun internalSchemas() = requiredSchemas.keys + extraSchemas.filter { schema -> // when mapped schemas from the finance module are present, they are considered as internal ones
|
fun internalSchemas() = requiredSchemas.keys + extraSchemas.filter { schema -> // when mapped schemas from the finance module are present, they are considered as internal ones
|
||||||
schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" || schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" }
|
schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" ||
|
||||||
|
schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" ||
|
||||||
|
schema::class.qualifiedName == "net.corda.node.services.transactions.NodeNotarySchemaV1"
|
||||||
|
}
|
||||||
|
|
||||||
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas + extraSchemas.associateBy({ it }, { SchemaOptions() })
|
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas + extraSchemas.associateBy({ it }, { SchemaOptions() })
|
||||||
|
|
||||||
|
@ -70,6 +70,14 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
|||||||
var requestDate: Instant
|
var requestDate: Instant
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@Entity
|
||||||
|
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_transactions")
|
||||||
|
class CommittedTransaction(
|
||||||
|
@Id
|
||||||
|
@Column(name = "transaction_id", nullable = false, length = 64)
|
||||||
|
val transactionId: String
|
||||||
|
)
|
||||||
|
|
||||||
private data class CommitRequest(
|
private data class CommitRequest(
|
||||||
val states: List<StateRef>,
|
val states: List<StateRef>,
|
||||||
val txId: SecureHash,
|
val txId: SecureHash,
|
||||||
@ -190,13 +198,32 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
|||||||
logRequest(txId, callerIdentity, requestSignature)
|
logRequest(txId, callerIdentity, requestSignature)
|
||||||
val conflictingStates = findAlreadyCommitted(states, references, commitLog)
|
val conflictingStates = findAlreadyCommitted(states, references, commitLog)
|
||||||
if (conflictingStates.isNotEmpty()) {
|
if (conflictingStates.isNotEmpty()) {
|
||||||
handleConflicts(txId, conflictingStates)
|
if (states.isEmpty()) {
|
||||||
|
handleReferenceConflicts(txId, conflictingStates)
|
||||||
|
} else {
|
||||||
|
handleConflicts(txId, conflictingStates)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
handleNoConflicts(timeWindow, states, txId, commitLog)
|
handleNoConflicts(timeWindow, states, txId, commitLog)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun previouslyCommitted(txId: SecureHash): Boolean {
|
||||||
|
val session = currentDBSession()
|
||||||
|
return session.find(CommittedTransaction::class.java, txId.toString()) != null
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun handleReferenceConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>) {
|
||||||
|
val session = currentDBSession()
|
||||||
|
if (!previouslyCommitted(txId)) {
|
||||||
|
val conflictError = NotaryError.Conflict(txId, conflictingStates)
|
||||||
|
log.debug { "Failure, input states already committed: ${conflictingStates.keys}" }
|
||||||
|
throw NotaryInternalException(conflictError)
|
||||||
|
}
|
||||||
|
log.debug { "Transaction $txId already notarised" }
|
||||||
|
}
|
||||||
|
|
||||||
private fun handleConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>) {
|
private fun handleConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>) {
|
||||||
if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
|
if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
|
||||||
log.debug { "Transaction $txId already notarised" }
|
log.debug { "Transaction $txId already notarised" }
|
||||||
@ -214,8 +241,13 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
|||||||
states.forEach { stateRef ->
|
states.forEach { stateRef ->
|
||||||
commitLog[stateRef] = txId
|
commitLog[stateRef] = txId
|
||||||
}
|
}
|
||||||
|
val session = currentDBSession()
|
||||||
|
session.persist(CommittedTransaction(txId.toString()))
|
||||||
log.debug { "Successfully committed all input states: $states" }
|
log.debug { "Successfully committed all input states: $states" }
|
||||||
} else {
|
} else {
|
||||||
|
if (states.isEmpty() && previouslyCommitted(txId)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
throw NotaryInternalException(outsideTimeWindowError)
|
throw NotaryInternalException(outsideTimeWindowError)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,8 @@ object NodeNotarySchema
|
|||||||
object NodeNotarySchemaV1 : MappedSchema(schemaFamily = NodeNotarySchema.javaClass, version = 1,
|
object NodeNotarySchemaV1 : MappedSchema(schemaFamily = NodeNotarySchema.javaClass, version = 1,
|
||||||
mappedTypes = listOf(PersistentUniquenessProvider.BaseComittedState::class.java,
|
mappedTypes = listOf(PersistentUniquenessProvider.BaseComittedState::class.java,
|
||||||
PersistentUniquenessProvider.Request::class.java,
|
PersistentUniquenessProvider.Request::class.java,
|
||||||
PersistentUniquenessProvider.CommittedState::class.java
|
PersistentUniquenessProvider.CommittedState::class.java,
|
||||||
|
PersistentUniquenessProvider.CommittedTransaction::class.java
|
||||||
)) {
|
)) {
|
||||||
override val migrationResource = "node-notary.changelog-master"
|
override val migrationResource = "node-notary.changelog-master"
|
||||||
}
|
}
|
@ -0,0 +1,14 @@
|
|||||||
|
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||||
|
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||||
|
|
||||||
|
<changeSet author="R3.Corda" id="create-notary-committed-transactions-table">
|
||||||
|
<createTable tableName="node_notary_committed_transactions">
|
||||||
|
<column name="transaction_id" type="NVARCHAR(64)">
|
||||||
|
<constraints nullable="false"/>
|
||||||
|
</column>
|
||||||
|
</createTable>
|
||||||
|
<addPrimaryKey columnNames="transaction_id" constraintName="node_notary_transactions_pkey" tableName="node_notary_committed_transactions"/>
|
||||||
|
</changeSet>
|
||||||
|
</databaseChangeLog>
|
@ -8,5 +8,6 @@
|
|||||||
|
|
||||||
<include file="migration/node-notary.changelog-v1.xml"/>
|
<include file="migration/node-notary.changelog-v1.xml"/>
|
||||||
<include file="migration/node-notary.changelog-pkey.xml"/>
|
<include file="migration/node-notary.changelog-pkey.xml"/>
|
||||||
|
<include file="migration/node-notary.changelog-committed-transactions-table.xml" />
|
||||||
|
|
||||||
</databaseChangeLog>
|
</databaseChangeLog>
|
||||||
|
@ -1,14 +1,16 @@
|
|||||||
package net.corda.node.services.transactions
|
package net.corda.node.services.transactions
|
||||||
|
|
||||||
|
import net.corda.core.contracts.TimeWindow
|
||||||
import net.corda.core.crypto.DigitalSignature
|
import net.corda.core.crypto.DigitalSignature
|
||||||
import net.corda.core.crypto.NullKeys
|
import net.corda.core.crypto.NullKeys
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.crypto.sha256
|
import net.corda.core.crypto.sha256
|
||||||
import net.corda.core.flows.NotarisationRequestSignature
|
import net.corda.core.flows.NotarisationRequestSignature
|
||||||
import net.corda.core.flows.NotaryError
|
import net.corda.core.flows.NotaryError
|
||||||
|
import net.corda.core.flows.StateConsumptionDetails
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.internal.notary.NotaryInternalException
|
|
||||||
import net.corda.core.internal.notary.UniquenessProvider
|
import net.corda.core.internal.notary.UniquenessProvider
|
||||||
|
import net.corda.core.utilities.minutes
|
||||||
import net.corda.node.services.schema.NodeSchemaService
|
import net.corda.node.services.schema.NodeSchemaService
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||||
@ -19,13 +21,14 @@ import net.corda.testing.internal.LogHelper
|
|||||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||||
import net.corda.testing.internal.configureDatabase
|
import net.corda.testing.internal.configureDatabase
|
||||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||||
|
import net.corda.testing.node.TestClock
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
import org.junit.Before
|
import org.junit.Before
|
||||||
import org.junit.Rule
|
import org.junit.Rule
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import java.time.Clock
|
import java.time.Clock
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
import kotlin.test.assertFailsWith
|
import kotlin.test.assertTrue
|
||||||
|
|
||||||
class PersistentUniquenessProviderTests {
|
class PersistentUniquenessProviderTests {
|
||||||
@Rule
|
@Rule
|
||||||
@ -70,10 +73,98 @@ class PersistentUniquenessProviderTests {
|
|||||||
|
|
||||||
val secondTxId = SecureHash.randomSHA256()
|
val secondTxId = SecureHash.randomSHA256()
|
||||||
|
|
||||||
val response:UniquenessProvider.Result = provider.commit(inputs, secondTxId, identity, requestSignature).get()
|
val response: UniquenessProvider.Result = provider.commit(inputs, secondTxId, identity, requestSignature).get()
|
||||||
val error = (response as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
|
val error = (response as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
|
||||||
|
|
||||||
val conflictCause = error.consumedStates[inputState]!!
|
val conflictCause = error.consumedStates[inputState]!!
|
||||||
|
assertEquals(firstTxId.sha256(), conflictCause.hashOfTransactionId)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `rejects transaction with invalid time window`() {
|
||||||
|
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||||
|
val inputState1 = generateStateRef()
|
||||||
|
val firstTxId = SecureHash.randomSHA256()
|
||||||
|
val timeWindow = TimeWindow.fromOnly(Clock.systemUTC().instant().plus(30.minutes))
|
||||||
|
val result = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get()
|
||||||
|
val error = (result as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid
|
||||||
|
assertEquals(timeWindow, error.txTimeWindow)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `handles transaction with valid time window`() {
|
||||||
|
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||||
|
val inputState1 = generateStateRef()
|
||||||
|
val firstTxId = SecureHash.randomSHA256()
|
||||||
|
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
|
||||||
|
val result = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get()
|
||||||
|
assertEquals(UniquenessProvider.Result.Success, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `handles transaction with valid time window without inputs`() {
|
||||||
|
val testClock = TestClock(Clock.systemUTC())
|
||||||
|
val provider = PersistentUniquenessProvider(testClock, database, TestingNamedCacheFactory())
|
||||||
|
val firstTxId = SecureHash.randomSHA256()
|
||||||
|
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
|
||||||
|
val result = provider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
|
||||||
|
assertEquals(UniquenessProvider.Result.Success, result)
|
||||||
|
|
||||||
|
// Re-notarisation works outside the specified time window.
|
||||||
|
testClock.advanceBy(90.minutes)
|
||||||
|
val result2 = provider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
|
||||||
|
assertEquals(UniquenessProvider.Result.Success, result2)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `handles reference states`() {
|
||||||
|
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||||
|
val inputState1 = generateStateRef()
|
||||||
|
val inputState2 = generateStateRef()
|
||||||
|
val firstTxId = SecureHash.randomSHA256()
|
||||||
|
val secondTxId = SecureHash.randomSHA256()
|
||||||
|
|
||||||
|
// Conflict free transaction goes through.
|
||||||
|
val result1 = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, references = listOf(inputState2)).get()
|
||||||
|
assertEquals(UniquenessProvider.Result.Success, result1)
|
||||||
|
|
||||||
|
// Referencing a spent state results in a conflict.
|
||||||
|
val result2 = provider.commit(listOf(inputState2), secondTxId, identity, requestSignature, references = listOf(inputState1)).get()
|
||||||
|
val error = (result2 as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
|
||||||
|
val conflictCause = error.consumedStates[inputState1]!!
|
||||||
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
|
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
|
||||||
|
assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type)
|
||||||
|
|
||||||
|
// Re-notarisation works.
|
||||||
|
val result3 = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, references = listOf(inputState2)).get()
|
||||||
|
assertEquals(UniquenessProvider.Result.Success, result3)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `handles transaction with reference states only`() {
|
||||||
|
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||||
|
val inputState1 = generateStateRef()
|
||||||
|
val firstTxId = SecureHash.randomSHA256()
|
||||||
|
val secondTxId = SecureHash.randomSHA256()
|
||||||
|
val thirdTxId = SecureHash.randomSHA256()
|
||||||
|
|
||||||
|
// Conflict free transaction goes through.
|
||||||
|
val result1 = provider.commit(emptyList(), firstTxId, identity, requestSignature, references = listOf(inputState1)).get()
|
||||||
|
assertEquals(UniquenessProvider.Result.Success, result1)
|
||||||
|
|
||||||
|
// Commit state 1.
|
||||||
|
val result2 = provider.commit(listOf(inputState1), secondTxId, identity, requestSignature).get()
|
||||||
|
assertEquals(UniquenessProvider.Result.Success, result2)
|
||||||
|
|
||||||
|
// Re-notarisation works.
|
||||||
|
val result3 = provider.commit(emptyList(), firstTxId, identity, requestSignature, references = listOf(inputState1)).get()
|
||||||
|
assertEquals(UniquenessProvider.Result.Success, result3)
|
||||||
|
|
||||||
|
// Transaction referencing the spent sate fails.
|
||||||
|
val result4 = provider.commit(emptyList(), thirdTxId, identity, requestSignature, references = listOf(inputState1)).get()
|
||||||
|
val error = (result4 as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
|
||||||
|
val conflictCause = error.consumedStates[inputState1]!!
|
||||||
|
assertEquals(conflictCause.hashOfTransactionId, secondTxId.sha256())
|
||||||
|
assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user