diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 344ab468c1..1259adf2d8 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -57,7 +57,10 @@ class NodeSchemaService(private val extraSchemas: Set = emptySet() Pair(NodeCoreV1, SchemaOptions())) fun internalSchemas() = requiredSchemas.keys + extraSchemas.filter { schema -> // when mapped schemas from the finance module are present, they are considered as internal ones - schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" || schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" } + schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" || + schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" || + schema::class.qualifiedName == "net.corda.notary.jpa.JPANotarySchemaV1" + } override val schemaOptions: Map = requiredSchemas + extraSchemas.associateBy({ it }, { SchemaOptions() }) diff --git a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt index 6172b9f18f..4bdb4e38d0 100644 --- a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt +++ b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt @@ -89,6 +89,13 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va @Column(name = "consuming_transaction_id", nullable = false, length = 64) val consumingTxHash: String) + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}jpa_notary_committed_transactions") + class CommittedTransaction( + @Id + val transactionId: String + ) + private val requestQueue = LinkedBlockingQueue(requestQueueSize) // TODO: Collect metrics. @@ -168,6 +175,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va for (cs in request.committedStatesEntities) { session.persist(cs) } + session.persist(CommittedTransaction(request.txId.toString())) } } @@ -185,7 +193,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va val stateRef = decodeStateRef(it.id) val consumingTxId = SecureHash.parse(it.consumingTxHash) if (stateRef in references) { - stateRef to StateConsumptionDetails(hashOfTransactionId = consumingTxId, type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE) + stateRef to StateConsumptionDetails(consumingTxId.sha256(), type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE) } else { stateRef to StateConsumptionDetails(consumingTxId.sha256()) } @@ -215,17 +223,22 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va return findAlreadyCommitted(session, allInputs, references).toMutableMap() } - private fun processRequest(request: CommitRequest, allConflicts: MutableMap, toCommit: MutableList): UniquenessProvider.Result { + private fun processRequest(session: Session, request: CommitRequest, allConflicts: MutableMap, toCommit: MutableList): UniquenessProvider.Result { val conflicts = (request.states + request.references).mapNotNull { if (allConflicts.containsKey(it)) it to allConflicts[it]!! else null }.toMap() + val result = if (conflicts.isNotEmpty()) { if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) { UniquenessProvider.Result.Success } else { - UniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts)) + if (request.states.isEmpty() && isPreviouslyNotarised(session, request.txId)) { + UniquenessProvider.Result.Success + } else { + UniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts)) + } } } else { val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow) @@ -237,12 +250,20 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va } UniquenessProvider.Result.Success } else { - UniquenessProvider.Result.Failure(outsideTimeWindowError) + if (request.states.isEmpty() && isPreviouslyNotarised(session, request.txId)) { + UniquenessProvider.Result.Success + } else { + UniquenessProvider.Result.Failure(outsideTimeWindowError) + } } } return result } + private fun isPreviouslyNotarised(session: Session, txId: SecureHash): Boolean { + return session.find(CommittedTransaction::class.java, txId.toString()) != null + } + private fun processRequests(requests: List) { try { // Note that there is an additional retry mechanism within the transaction itself. @@ -255,7 +276,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va val allConflicts = findAllConflicts(session, requests) val results = requests.map { request -> - processRequest(request, allConflicts, toCommit) + processRequest(session, request, allConflicts, toCommit) } logRequests(requests) commitRequests(session, toCommit) diff --git a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/Schema.kt b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/Schema.kt index 7c9a614099..7c4b3eaf2d 100644 --- a/notary/jpa/src/main/kotlin/net/corda/notary/jpa/Schema.kt +++ b/notary/jpa/src/main/kotlin/net/corda/notary/jpa/Schema.kt @@ -9,7 +9,8 @@ object JPANotarySchemaV1 : MappedSchema( version = 1, mappedTypes = listOf( JPAUniquenessProvider.CommittedState::class.java, - JPAUniquenessProvider.Request::class.java + JPAUniquenessProvider.Request::class.java, + JPAUniquenessProvider.CommittedTransaction::class.java ) ) { override val migrationResource: String? diff --git a/notary/jpa/src/main/resources/migration/notary-jpa.changelog-create-committed-transactions-table.xml b/notary/jpa/src/main/resources/migration/notary-jpa.changelog-create-committed-transactions-table.xml new file mode 100644 index 0000000000..faae3def27 --- /dev/null +++ b/notary/jpa/src/main/resources/migration/notary-jpa.changelog-create-committed-transactions-table.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + diff --git a/notary/jpa/src/main/resources/migration/notary-jpa.changelog-master.xml b/notary/jpa/src/main/resources/migration/notary-jpa.changelog-master.xml index 092e973a8f..a1678cc668 100644 --- a/notary/jpa/src/main/resources/migration/notary-jpa.changelog-master.xml +++ b/notary/jpa/src/main/resources/migration/notary-jpa.changelog-master.xml @@ -4,5 +4,6 @@ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"> + diff --git a/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt b/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt index 453b689cd2..5ee763376b 100644 --- a/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt +++ b/notary/jpa/src/test/kotlin/net/corda/notary/jpa/JPAUniquenessProviderTests.kt @@ -1,13 +1,16 @@ package net.corda.notary.jpa +import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.NullKeys import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError +import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.CordaX500Name import net.corda.core.internal.notary.UniquenessProvider +import net.corda.core.utilities.minutes import net.corda.node.services.schema.NodeSchemaService import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig @@ -19,6 +22,7 @@ import net.corda.testing.core.generateStateRef import net.corda.testing.internal.LogHelper import net.corda.testing.internal.configureDatabase import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import net.corda.testing.node.TestClock import org.junit.After import org.junit.Before import org.junit.Rule @@ -99,4 +103,92 @@ class JPAUniquenessProviderTests { val error = (secondResult as UniquenessProvider.Result.Failure).error as NotaryError.Conflict assertEquals(nrStates, error.consumedStates.size) } + + @Test + fun `handles reference states`() { + val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig) + val inputState1 = generateStateRef() + val inputState2 = generateStateRef() + val firstTxId = SecureHash.randomSHA256() + val secondTxId = SecureHash.randomSHA256() + + // Conflict free transaction goes through. + val result1 = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, references = listOf(inputState2)).get() + assertEquals(UniquenessProvider.Result.Success, result1) + + // Referencing a spent state results in a conflict. + val result2 = provider.commit(listOf(inputState2), secondTxId, identity, requestSignature, references = listOf(inputState1)).get() + val error = (result2 as UniquenessProvider.Result.Failure).error as NotaryError.Conflict + val conflictCause = error.consumedStates[inputState1]!! + assertEquals(firstTxId.sha256(), conflictCause.hashOfTransactionId) + assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type) + + // Re-notarisation works. + val result3 = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, references = listOf(inputState2)).get() + assertEquals(UniquenessProvider.Result.Success, result3) + } + + @Test + fun `handles transaction with reference states only`() { + val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig) + val inputState1 = generateStateRef() + val firstTxId = SecureHash.randomSHA256() + val secondTxId = SecureHash.randomSHA256() + val thirdTxId = SecureHash.randomSHA256() + + // Conflict free transaction goes through. + val result1 = provider.commit(emptyList(), firstTxId, identity, requestSignature, references = listOf(inputState1)).get() + assertEquals(UniquenessProvider.Result.Success, result1) + + // Commit state 1. + val result2 = provider.commit(listOf(inputState1), secondTxId, identity, requestSignature).get() + assertEquals(UniquenessProvider.Result.Success, result2) + + // Re-notarisation works. + val result3 = provider.commit(emptyList(), firstTxId, identity, requestSignature, references = listOf(inputState1)).get() + assertEquals(UniquenessProvider.Result.Success, result3) + + // Transaction referencing the spent sate fails. + val result4 = provider.commit(emptyList(), thirdTxId, identity, requestSignature, references = listOf(inputState1)).get() + val error = (result4 as UniquenessProvider.Result.Failure).error as NotaryError.Conflict + val conflictCause = error.consumedStates[inputState1]!! + assertEquals(conflictCause.hashOfTransactionId, secondTxId.sha256()) + assertEquals(conflictCause.type, StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE) + } + + @Test + fun `rejects transaction with invalid time window`() { + val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig) + val inputState1 = generateStateRef() + val firstTxId = SecureHash.randomSHA256() + val timeWindow = TimeWindow.fromOnly(Clock.systemUTC().instant().plus(30.minutes)) + val result = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get() + val error = (result as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid + assertEquals(timeWindow, error.txTimeWindow) + } + + @Test + fun `handles transaction with valid time window`() { + val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig) + val inputState1 = generateStateRef() + val firstTxId = SecureHash.randomSHA256() + val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes)) + val result = provider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get() + assertEquals(UniquenessProvider.Result.Success, result) + } + + @Test + fun `handles transaction with valid time window without inputs`() { + val testClock = TestClock(Clock.systemUTC()) + val provider = JPAUniquenessProvider(testClock, database, notaryConfig) + val firstTxId = SecureHash.randomSHA256() + val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes)) + val result = provider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow).get() + assertEquals(UniquenessProvider.Result.Success, result) + + // Re-notarisation works outside the specified time window. + testClock.advanceBy(90.minutes) + val result2 = provider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow).get() + assertEquals(UniquenessProvider.Result.Success, result2) + } }